You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/05/02 11:36:22 UTC
flink git commit: [FLINK-6337][network] Remove the buffer provider
from PartitionRequestServerHandler
Repository: flink
Updated Branches:
refs/heads/master 33695781f -> 464d6f553
[FLINK-6337][network] Remove the buffer provider from PartitionRequestServerHandler
The buffer provider is not needed and most likely a left over from prior
refactorings.
This closes #3785.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/464d6f55
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/464d6f55
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/464d6f55
Branch: refs/heads/master
Commit: 464d6f553be79409bb25007255e1306884d09186
Parents: 3369578
Author: Zhijiang <wa...@aliyun.com>
Authored: Wed Apr 26 16:18:54 2017 +0800
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue May 2 13:35:18 2017 +0200
----------------------------------------------------------------------
.../runtime/io/network/ConnectionManager.java | 4 +--
.../io/network/LocalConnectionManager.java | 5 +---
.../runtime/io/network/NetworkEnvironment.java | 2 +-
.../network/netty/NettyConnectionManager.java | 5 ++--
.../network/netty/PartitionRequestProtocol.java | 7 ++---
.../netty/PartitionRequestServerHandler.java | 19 ++----------
.../netty/SequenceNumberingViewReader.java | 5 +---
.../partition/PipelinedSubpartition.java | 3 +-
.../io/network/partition/ResultPartition.java | 4 +--
.../partition/ResultPartitionManager.java | 4 +--
.../partition/ResultPartitionProvider.java | 3 --
.../network/partition/ResultSubpartition.java | 3 +-
.../partition/SpillableSubpartition.java | 7 ++---
.../partition/consumer/LocalInputChannel.java | 2 +-
.../netty/CancelPartitionRequestTest.java | 13 ++++----
.../netty/ClientTransportErrorHandlingTest.java | 10 ++-----
.../netty/NettyConnectionManagerTest.java | 7 ++---
.../netty/PartitionRequestQueueTest.java | 5 +---
.../netty/ServerTransportErrorHandlingTest.java | 9 ++----
.../partition/InputChannelTestUtils.java | 7 ++---
.../partition/PipelinedSubpartitionTest.java | 8 ++---
.../partition/SpillableSubpartitionTest.java | 31 ++++++++------------
.../network/partition/SubpartitionTestBase.java | 5 +---
.../consumer/LocalInputChannelTest.java | 8 ++---
.../partition/consumer/SingleInputGateTest.java | 8 ++---
...SuccessAfterNetworkBuffersFailureITCase.java | 2 +-
26 files changed, 62 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index 02deb9d..1225230 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
@@ -31,8 +30,7 @@ import java.io.IOException;
public interface ConnectionManager {
void start(ResultPartitionProvider partitionProvider,
- TaskEventDispatcher taskEventDispatcher,
- NetworkBufferPool networkbufferPool) throws IOException;
+ TaskEventDispatcher taskEventDispatcher) throws IOException;
/**
* Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 4f51a56..bece6a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
@@ -29,9 +28,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
public class LocalConnectionManager implements ConnectionManager {
@Override
- public void start(ResultPartitionProvider partitionProvider,
- TaskEventDispatcher taskEventDispatcher,
- NetworkBufferPool networkbufferPool) {
+ public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) {
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 4d4b305..cc4cb77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -287,7 +287,7 @@ public class NetworkEnvironment {
try {
LOG.debug("Starting network connection manager");
- connectionManager.start(resultPartitionManager, taskEventDispatcher, networkBufferPool);
+ connectionManager.start(resultPartitionManager, taskEventDispatcher);
}
catch (IOException t) {
throw new IOException("Failed to instantiate network connection manager.", t);
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index abee2a8..fcf618a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import java.io.IOException;
@@ -45,10 +44,10 @@ public class NettyConnectionManager implements ConnectionManager {
}
@Override
- public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool)
+ public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher)
throws IOException {
PartitionRequestProtocol partitionRequestProtocol =
- new PartitionRequestProtocol(partitionProvider, taskEventDispatcher, networkbufferPool);
+ new PartitionRequestProtocol(partitionProvider, taskEventDispatcher);
client.init(partitionRequestProtocol, bufferPool);
server.init(partitionRequestProtocol, bufferPool);
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
index a39f085..d06a018 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.netty;
import io.netty.channel.ChannelHandler;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder;
@@ -34,12 +33,10 @@ class PartitionRequestProtocol implements NettyProtocol {
private final ResultPartitionProvider partitionProvider;
private final TaskEventDispatcher taskEventDispatcher;
- private final NetworkBufferPool networkbufferPool;
- PartitionRequestProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) {
+ PartitionRequestProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) {
this.partitionProvider = partitionProvider;
this.taskEventDispatcher = taskEventDispatcher;
- this.networkbufferPool = networkbufferPool;
}
// +-------------------------------------------------------------------+
@@ -77,7 +74,7 @@ class PartitionRequestProtocol implements NettyProtocol {
public ChannelHandler[] getServerChannelHandlers() {
PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
- partitionProvider, taskEventDispatcher, queueOfPartitionQueues, networkbufferPool);
+ partitionProvider, taskEventDispatcher, queueOfPartitionQueues);
return new ChannelHandler[] {
messageEncoder,
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 6f56877..1bd05f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest;
import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
@@ -47,36 +45,24 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
private final PartitionRequestQueue outboundQueue;
- private final NetworkBufferPool networkBufferPool;
-
- private BufferPool bufferPool;
-
PartitionRequestServerHandler(
ResultPartitionProvider partitionProvider,
TaskEventDispatcher taskEventDispatcher,
- PartitionRequestQueue outboundQueue,
- NetworkBufferPool networkBufferPool) {
+ PartitionRequestQueue outboundQueue) {
this.partitionProvider = partitionProvider;
this.taskEventDispatcher = taskEventDispatcher;
this.outboundQueue = outboundQueue;
- this.networkBufferPool = networkBufferPool;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
-
- bufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
-
- if (bufferPool != null) {
- bufferPool.lazyDestroy();
- }
}
@Override
@@ -100,8 +86,7 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
reader.requestSubpartitionView(
partitionProvider,
request.partitionId,
- request.queueIndex,
- bufferPool);
+ request.queueIndex);
} catch (PartitionNotFoundException notFound) {
respondWithError(ctx, notFound, request.receiverId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
index 5036bb7..6d95ca5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
@@ -60,8 +59,7 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener {
void requestSubpartitionView(
ResultPartitionProvider partitionProvider,
ResultPartitionID resultPartitionId,
- int subPartitionIndex,
- BufferProvider bufferProvider) throws IOException {
+ int subPartitionIndex) throws IOException {
synchronized (requestLock) {
if (subpartitionView == null) {
@@ -72,7 +70,6 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener {
this.subpartitionView = partitionProvider.createSubpartitionView(
resultPartitionId,
subPartitionIndex,
- bufferProvider,
this);
} else {
throw new IllegalStateException("Subpartition already requested");
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 4fd74e2..ed72b51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,7 +162,7 @@ class PipelinedSubpartition extends ResultSubpartition {
}
@Override
- public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
+ public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
final int queueSize;
synchronized (buffers) {
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index d207f60..9b02e4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -362,7 +362,7 @@ public class ResultPartition implements BufferPoolOwner {
/**
* Returns the requested subpartition.
*/
- public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
+ public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
int refCnt = pendingReferences.get();
checkState(refCnt != -1, "Partition released.");
@@ -370,7 +370,7 @@ public class ResultPartition implements BufferPoolOwner {
checkElementIndex(index, subpartitions.length, "Subpartition not found.");
- ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider, availabilityListener);
+ ResultSubpartitionView readView = subpartitions[index].createReadView(availabilityListener);
LOG.debug("Created {}", readView);
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index 8ad3e34..f681548 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -22,7 +22,6 @@ import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Table;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +65,6 @@ public class ResultPartitionManager implements ResultPartitionProvider {
public ResultSubpartitionView createSubpartitionView(
ResultPartitionID partitionId,
int subpartitionIndex,
- BufferProvider bufferProvider,
BufferAvailabilityListener availabilityListener) throws IOException {
synchronized (registeredPartitions) {
@@ -79,7 +77,7 @@ public class ResultPartitionManager implements ResultPartitionProvider {
LOG.debug("Requesting subpartition {} of {}.", subpartitionIndex, partition);
- return partition.createSubpartitionView(subpartitionIndex, bufferProvider, availabilityListener);
+ return partition.createSubpartitionView(subpartitionIndex, availabilityListener);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
index 3fbfd49..db72d63 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.io.network.partition;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-
import java.io.IOException;
public interface ResultPartitionProvider {
@@ -30,7 +28,6 @@ public interface ResultPartitionProvider {
ResultSubpartitionView createSubpartitionView(
ResultPartitionID partitionId,
int index,
- BufferProvider bufferProvider,
BufferAvailabilityListener availabilityListener) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index d3cd887..3b4e3c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import java.io.IOException;
@@ -77,7 +76,7 @@ public abstract class ResultSubpartition {
abstract public void release() throws IOException;
- abstract public ResultSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException;
+ abstract public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException;
abstract int releaseMemory() throws IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index a578188..11c6d16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -164,7 +163,7 @@ class SpillableSubpartition extends ResultSubpartition {
}
@Override
- public ResultSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
+ public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
synchronized (buffers) {
if (!isFinished) {
throw new IllegalStateException("Subpartition has not been finished yet, " +
@@ -180,7 +179,7 @@ class SpillableSubpartition extends ResultSubpartition {
if (spillWriter != null) {
readView = new SpilledSubpartitionView(
this,
- bufferProvider.getMemorySegmentSize(),
+ parent.getBufferProvider().getMemorySegmentSize(),
spillWriter,
getTotalNumberOfBuffers(),
availabilityListener);
@@ -189,7 +188,7 @@ class SpillableSubpartition extends ResultSubpartition {
this,
buffers,
ioManager,
- bufferProvider.getMemorySegmentSize(),
+ parent.getBufferProvider().getMemorySegmentSize(),
availabilityListener);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 4e14e93..3ade2f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -113,7 +113,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
try {
ResultSubpartitionView subpartitionView = partitionManager.createSubpartitionView(
- partitionId, subpartitionIndex, inputGate.getBufferProvider(), this);
+ partitionId, subpartitionIndex, this);
if (subpartitionView == null) {
throw new IOException("Error requesting subpartition.");
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index a2f866a..8b4fc0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -22,7 +22,6 @@ import io.netty.channel.Channel;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -78,18 +77,18 @@ public class CancelPartitionRequestTest {
final ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
// Return infinite subpartition
- when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+ when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferAvailabilityListener.class)))
.thenAnswer(new Answer<ResultSubpartitionView>() {
@Override
public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
- BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+ BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2];
listener.notifyBuffersAvailable(Long.MAX_VALUE);
return view;
}
});
PartitionRequestProtocol protocol = new PartitionRequestProtocol(
- partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
+ partitions, mock(TaskEventDispatcher.class));
serverAndClient = initServerAndClient(protocol);
@@ -129,18 +128,18 @@ public class CancelPartitionRequestTest {
final ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
// Return infinite subpartition
- when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+ when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferAvailabilityListener.class)))
.thenAnswer(new Answer<ResultSubpartitionView>() {
@Override
public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
- BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+ BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2];
listener.notifyBuffersAvailable(Long.MAX_VALUE);
return view;
}
});
PartitionRequestProtocol protocol = new PartitionRequestProtocol(
- partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
+ partitions, mock(TaskEventDispatcher.class));
serverAndClient = initServerAndClient(protocol);
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index 22e7754..b4fc46f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -29,7 +29,6 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
@@ -85,8 +84,7 @@ public class ClientTransportErrorHandlingTest {
public ChannelHandler[] getClientChannelHandlers() {
return new PartitionRequestProtocol(
mock(ResultPartitionProvider.class),
- mock(TaskEventDispatcher.class),
- mock(NetworkBufferPool.class)).getClientChannelHandlers();
+ mock(TaskEventDispatcher.class)).getClientChannelHandlers();
}
};
@@ -235,8 +233,7 @@ public class ClientTransportErrorHandlingTest {
public ChannelHandler[] getClientChannelHandlers() {
return new PartitionRequestProtocol(
mock(ResultPartitionProvider.class),
- mock(TaskEventDispatcher.class),
- mock(NetworkBufferPool.class)).getClientChannelHandlers();
+ mock(TaskEventDispatcher.class)).getClientChannelHandlers();
}
};
@@ -383,8 +380,7 @@ public class ClientTransportErrorHandlingTest {
private EmbeddedChannel createEmbeddedChannel() {
PartitionRequestProtocol protocol = new PartitionRequestProtocol(
mock(ResultPartitionProvider.class),
- mock(TaskEventDispatcher.class),
- mock(NetworkBufferPool.class));
+ mock(TaskEventDispatcher.class));
return new EmbeddedChannel(protocol.getClientChannelHandlers());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index 8ab572e..77de6bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -23,7 +23,6 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.util.NetUtils;
import org.junit.Test;
@@ -60,8 +59,7 @@ public class NettyConnectionManagerTest {
connectionManager.start(
mock(ResultPartitionProvider.class),
- mock(TaskEventDispatcher.class),
- mock(NetworkBufferPool.class));
+ mock(TaskEventDispatcher.class));
assertEquals(numberOfSlots, connectionManager.getBufferPool().getNumberOfArenas());
@@ -129,8 +127,7 @@ public class NettyConnectionManagerTest {
connectionManager.start(
mock(ResultPartitionProvider.class),
- mock(TaskEventDispatcher.class),
- mock(NetworkBufferPool.class));
+ mock(TaskEventDispatcher.class));
assertEquals(numberOfArenas, connectionManager.getBufferPool().getNumberOfArenas());
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 7224e96..b969b1c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.netty;
import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
@@ -43,7 +42,6 @@ public class PartitionRequestQueueTest {
ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
ResultPartitionID rpid = new ResultPartitionID();
- BufferProvider bufferProvider = mock(BufferProvider.class);
ResultSubpartitionView view = mock(ResultSubpartitionView.class);
when(view.isReleased()).thenReturn(true);
@@ -52,13 +50,12 @@ public class PartitionRequestQueueTest {
when(partitionProvider.createSubpartitionView(
eq(rpid),
eq(0),
- eq(bufferProvider),
any(BufferAvailabilityListener.class))).thenReturn(view);
EmbeddedChannel ch = new EmbeddedChannel(queue);
SequenceNumberingViewReader seqView = new SequenceNumberingViewReader(new InputChannelID(), queue);
- seqView.requestSubpartitionView(partitionProvider, rpid, 0, bufferProvider);
+ seqView.requestSubpartitionView(partitionProvider, rpid, 0);
// Enqueue the erroneous view
queue.notifyReaderNonEmpty(seqView);
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 1c3557e..3c4ebb3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -23,8 +23,6 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -63,11 +61,11 @@ public class ServerTransportErrorHandlingTest {
final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
when(partitionManager
- .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+ .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)))
.thenAnswer(new Answer<ResultSubpartitionView>() {
@Override
public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
- BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+ BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2];
listener.notifyBuffersAvailable(Long.MAX_VALUE);
return new CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync);
}
@@ -78,8 +76,7 @@ public class ServerTransportErrorHandlingTest {
public ChannelHandler[] getServerChannelHandlers() {
return new PartitionRequestProtocol(
partitionManager,
- mock(TaskEventDispatcher.class),
- mock(NetworkBufferPool.class)).getServerChannelHandlers();
+ mock(TaskEventDispatcher.class)).getServerChannelHandlers();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index e292576..a327838 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -60,14 +59,14 @@ class InputChannelTestUtils {
@Override
public ResultSubpartitionView answer(InvocationOnMock invocation) throws Throwable {
- BufferAvailabilityListener channel = (BufferAvailabilityListener) invocation.getArguments()[3];
- return sources[num++].createReadView(null, channel);
+ BufferAvailabilityListener channel = (BufferAvailabilityListener) invocation.getArguments()[2];
+ return sources[num++].createReadView(channel);
}
};
ResultPartitionManager manager = mock(ResultPartitionManager.class);
when(manager.createSubpartitionView(
- any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+ any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)))
.thenAnswer(viewCreator);
return manager;
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index e3200d1..de1e8a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -70,14 +70,14 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
final PipelinedSubpartition subpartition = createSubpartition();
// Successful request
- assertNotNull(subpartition.createReadView(null, new BufferAvailabilityListener() {
+ assertNotNull(subpartition.createReadView(new BufferAvailabilityListener() {
@Override
public void notifyBuffersAvailable(long numBuffers) {
}
}));
try {
- subpartition.createReadView(null, new BufferAvailabilityListener() {
+ subpartition.createReadView(new BufferAvailabilityListener() {
@Override
public void notifyBuffersAvailable(long numBuffers) {
}
@@ -94,7 +94,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
- ResultSubpartitionView view = subpartition.createReadView(null, listener);
+ ResultSubpartitionView view = subpartition.createReadView(listener);
// Empty => should return null
assertNull(view.getNextBuffer());
@@ -221,7 +221,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
final PipelinedSubpartition subpartition = createSubpartition();
TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(isSlowConsumer, consumerCallback);
- final PipelinedSubpartitionView view = subpartition.createReadView(null, consumer);
+ final PipelinedSubpartitionView view = subpartition.createReadView(consumer);
consumer.setSubpartitionView(view);
Future<Boolean> producerResult = executorService.submit(
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index b53ef68..2b356a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.junit.AfterClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -67,8 +67,12 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
}
@Override
- ResultSubpartition createSubpartition() {
- return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager);
+ SpillableSubpartition createSubpartition() {
+ ResultPartition parent = mock(ResultPartition.class);
+ BufferProvider bufferProvider = mock(BufferProvider.class);
+ when(parent.getBufferProvider()).thenReturn(bufferProvider);
+ when(bufferProvider.getMemorySegmentSize()).thenReturn(32 * 1024);
+ return new SpillableSubpartition(0, parent, ioManager);
}
/**
@@ -138,14 +142,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
@Test
public void testReleasePartitionAndGetNext() throws Exception {
// Create partition and add some buffers
- SpillableSubpartition partition = new SpillableSubpartition(
- 0, mock(ResultPartition.class), ioManager);
+ SpillableSubpartition partition = createSubpartition();
partition.finish();
// Create the read view
ResultSubpartitionView readView = spy(partition
- .createReadView(new TestInfiniteBufferProvider(), new BufferAvailabilityListener() {
+ .createReadView(new BufferAvailabilityListener() {
@Override
public void notifyBuffersAvailable(long numBuffers) {
@@ -168,11 +171,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
*/
@Test
public void testConsumeSpilledPartition() throws Exception {
- ResultPartition parent = mock(ResultPartition.class);
- SpillableSubpartition partition = new SpillableSubpartition(
- 0,
- parent,
- ioManager);
+ SpillableSubpartition partition = createSubpartition();
Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
buffer.retain();
@@ -187,7 +186,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
partition.finish();
BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
- SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(new TestInfiniteBufferProvider(), listener);
+ SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
@@ -216,11 +215,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
*/
@Test
public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception {
- ResultPartition parent = mock(ResultPartition.class);
- SpillableSubpartition partition = new SpillableSubpartition(
- 0,
- parent,
- ioManager);
+ SpillableSubpartition partition = createSubpartition();
Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
buffer.retain();
@@ -232,7 +227,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
partition.finish();
AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
- SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(new TestInfiniteBufferProvider(), listener);
+ SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(listener);
// Initial notification
assertEquals(1, listener.getNumNotifiedBuffers());
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 14942bc..800542e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -91,11 +90,9 @@ public abstract class SubpartitionTestBase extends TestLogger {
partition.add(buffer);
partition.finish();
- TestInfiniteBufferProvider buffers = new TestInfiniteBufferProvider();
-
// Create the view
BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
- ResultSubpartitionView view = partition.createReadView(buffers, listener);
+ ResultSubpartitionView view = partition.createReadView(listener);
// The added buffer and end-of-partition event
assertNotNull(view.getNextBuffer());
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 18c3038..fe819a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -198,7 +198,7 @@ public class LocalInputChannelTest {
LocalInputChannel ch = createLocalInputChannel(inputGate, partitionManager, backoff);
when(partitionManager
- .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class)))
+ .createSubpartitionView(eq(ch.partitionId), eq(0), any(BufferAvailabilityListener.class)))
.thenThrow(new PartitionNotFoundException(ch.partitionId));
Timer timer = mock(Timer.class);
@@ -214,7 +214,7 @@ public class LocalInputChannelTest {
// Initial request
ch.requestSubpartition(0);
verify(partitionManager)
- .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class));
+ .createSubpartitionView(eq(ch.partitionId), eq(0), any(BufferAvailabilityListener.class));
// Request subpartition and verify that the actual requests are delayed.
for (long expected : expectedDelays) {
@@ -241,7 +241,7 @@ public class LocalInputChannelTest {
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
when(partitionManager
- .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+ .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)))
.thenReturn(view);
SingleInputGate inputGate = mock(SingleInputGate.class);
@@ -296,7 +296,6 @@ public class LocalInputChannelTest {
.createSubpartitionView(
any(ResultPartitionID.class),
anyInt(),
- any(BufferProvider.class),
any(BufferAvailabilityListener.class)))
.thenAnswer(new Answer<ResultSubpartitionView>() {
@Override
@@ -360,7 +359,6 @@ public class LocalInputChannelTest {
when(partitionManager.createSubpartitionView(
any(ResultPartitionID.class),
anyInt(),
- any(BufferProvider.class),
any(BufferAvailabilityListener.class))).thenReturn(reader);
LocalInputChannel channel = new LocalInputChannel(
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 2d1b4b2..737f17b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -128,7 +127,6 @@ public class SingleInputGateTest {
when(partitionManager.createSubpartitionView(
any(ResultPartitionID.class),
anyInt(),
- any(BufferProvider.class),
any(BufferAvailabilityListener.class))).thenReturn(iterator);
// Setup reader with one local and one unknown input channel
@@ -163,7 +161,7 @@ public class SingleInputGateTest {
inputGate.requestPartitions();
// Only the local channel can request
- verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class));
+ verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
// Send event backwards and initialize unknown channel afterwards
final TaskEvent event = new TestTaskEvent();
@@ -175,7 +173,7 @@ public class SingleInputGateTest {
// After the update, the pending event should be send to local channel
inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(new ResultPartitionID(unknownPartitionId.getPartitionId(), unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal()));
- verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class));
+ verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
}
@@ -215,7 +213,7 @@ public class SingleInputGateTest {
ResultPartitionLocation.createLocal()));
verify(partitionManager, never()).createSubpartitionView(
- any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class));
+ any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 9729b09..710e7df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -52,7 +52,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 840);
+ config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 640);
cluster = new LocalFlinkMiniCluster(config, false);