You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/30 20:07:23 UTC
flink git commit: [FLINK-1801] [FLINK-1465] Network environment can
start prior to TaskManager in "disassociated" mode.
Repository: flink
Updated Branches:
refs/heads/master c89c657ae -> ee273dbe0
[FLINK-1801] [FLINK-1465] Network environment can start prior to TaskManager in "disassociated" mode.
NetworkEnvironment allocates heavy network buffer pool on startup and supports
multiple associations / disassociations with the TaskManager actor.
Fix negative memory report by replacing overflowing ints with longs.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee273dbe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee273dbe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee273dbe
Branch: refs/heads/master
Commit: ee273dbe01e95d2b260fa690e21e2c244a2a5711
Parents: c89c657
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 30 18:32:55 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 30 18:32:55 2015 +0200
----------------------------------------------------------------------
.../runtime/io/network/ConnectionManager.java | 4 +-
.../io/network/LocalConnectionManager.java | 11 +-
.../runtime/io/network/NetworkEnvironment.java | 396 ++++++++++++-------
.../runtime/io/network/TaskEventDispatcher.java | 6 +
.../io/network/buffer/NetworkBufferPool.java | 62 ++-
.../flink/runtime/taskmanager/TaskManager.scala | 24 +-
.../io/network/NetworkEnvironmentTest.java | 124 ++++++
.../io/network/buffer/LocalBufferPoolTest.java | 1 +
.../network/buffer/NetworkBufferPoolTest.java | 134 +++++++
9 files changed, 576 insertions(+), 186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index 06dc151..2f535fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -30,7 +30,9 @@ import java.io.IOException;
*/
public interface ConnectionManager {
- void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) throws IOException;
+ void start(ResultPartitionProvider partitionProvider,
+ TaskEventDispatcher taskEventDispatcher,
+ NetworkBufferPool networkbufferPool) throws IOException;
/**
* Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.
http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index af6273e..410f8ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -22,8 +22,6 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
-import java.io.IOException;
-
/**
* A connection manager implementation to bypass setup overhead for task managers running in local
* execution mode.
@@ -31,11 +29,13 @@ import java.io.IOException;
public class LocalConnectionManager implements ConnectionManager {
@Override
- public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) throws IOException {
+ public void start(ResultPartitionProvider partitionProvider,
+ TaskEventDispatcher taskEventDispatcher,
+ NetworkBufferPool networkbufferPool) {
}
@Override
- public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException {
+ public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
return null;
}
@@ -48,6 +48,5 @@ public class LocalConnectionManager implements ConnectionManager {
}
@Override
- public void shutdown() throws IOException {
- }
+ public void shutdown() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index e5dc8a2..faba252 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -52,207 +52,304 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpd
import static org.apache.flink.runtime.messages.TaskManagerMessages.FailTask;
/**
- * Network I/O components of each {@link TaskManager} instance.
+ * Network I/O components of each {@link TaskManager} instance. The network environment contains
+ * the data structures that keep track of all intermediate results and all data exchanges.
+ *
+ * When initialized, the NetworkEnvironment will allocate the network buffer pool.
+ * All other components (netty, intermediate result managers, ...) are only created once the
+ * environment is "associated" with a TaskManager and JobManager. This happens as soon as the
+ * TaskManager actor gets created and registers itself at the JobManager.
*/
public class NetworkEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
- private final ActorRef taskManager;
+ private final Object lock = new Object();
- private final ActorRef jobManager;
+ private final NetworkEnvironmentConfiguration configuration;
private final FiniteDuration jobManagerTimeout;
- private final ResultPartitionManager partitionManager;
-
- private final TaskEventDispatcher taskEventDispatcher;
-
private final NetworkBufferPool networkBufferPool;
- private final ConnectionManager connectionManager;
+ private ConnectionManager connectionManager;
- private final ResultPartitionConsumableNotifier partitionConsumableNotifier;
+ private ResultPartitionManager partitionManager;
- private final NetworkEnvironmentConfiguration configuration;
+ private TaskEventDispatcher taskEventDispatcher;
+
+ private ResultPartitionConsumableNotifier partitionConsumableNotifier;
private boolean isShutdown;
+
/**
* Initializes all network I/O components.
*/
- public NetworkEnvironment(
- ActorRef taskManager,
- ActorRef jobManager,
- FiniteDuration jobManagerTimeout,
- NetworkEnvironmentConfiguration config) throws IOException {
-
- this.taskManager = checkNotNull(taskManager);
- this.jobManager = checkNotNull(jobManager);
- this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
+ public NetworkEnvironment(FiniteDuration jobManagerTimeout,
+ NetworkEnvironmentConfiguration config) throws IOException {
- this.partitionManager = new ResultPartitionManager();
- this.taskEventDispatcher = new TaskEventDispatcher();
this.configuration = checkNotNull(config);
+ this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
- // --------------------------------------------------------------------
- // Network buffers
- // --------------------------------------------------------------------
+ // create the network buffers - this is the operation most likely to fail upon
+ // mis-configuration, so we do this first
try {
networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
}
catch (Throwable t) {
- throw new IOException("Failed to instantiate network buffer pool: " + t.getMessage(), t);
+ throw new IOException("Cannot allocate network buffer pool: " + t.getMessage(), t);
}
+ }
- // --------------------------------------------------------------------
- // Network connections
- // --------------------------------------------------------------------
- final Option<NettyConfig> nettyConfig = config.nettyConfig();
+ // --------------------------------------------------------------------------------------------
+ // Properties
+ // --------------------------------------------------------------------------------------------
- connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get()) : new LocalConnectionManager();
+ public ResultPartitionManager getPartitionManager() {
+ return partitionManager;
+ }
- try {
- connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
- }
- catch (Throwable t) {
- throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
- }
+ public TaskEventDispatcher getTaskEventDispatcher() {
+ return taskEventDispatcher;
+ }
- this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(this);
+ public ConnectionManager getConnectionManager() {
+ return connectionManager;
}
- public ActorRef getTaskManager() {
- return taskManager;
+ public NetworkBufferPool getNetworkBufferPool() {
+ return networkBufferPool;
}
- public ActorRef getJobManager() {
- return jobManager;
+ public IOMode getDefaultIOMode() {
+ return configuration.ioMode();
}
- public Timeout getJobManagerTimeout() {
- return new Timeout(jobManagerTimeout);
+ public ResultPartitionConsumableNotifier getPartitionConsumableNotifier() {
+ return partitionConsumableNotifier;
}
- public void registerTask(Task task) throws IOException {
- final ResultPartition[] producedPartitions = task.getProducedPartitions();
- final ResultPartitionWriter[] writers = task.getWriters();
+ // --------------------------------------------------------------------------------------------
+ // Association / Disassociation with JobManager / TaskManager
+ // --------------------------------------------------------------------------------------------
- if (writers.length != producedPartitions.length) {
- throw new IllegalStateException("Unequal number of writers and partitions.");
- }
+ public boolean isAssociated() {
+ return partitionConsumableNotifier != null;
+ }
- for (int i = 0; i < producedPartitions.length; i++) {
- final ResultPartition partition = producedPartitions[i];
- final ResultPartitionWriter writer = writers[i];
+ /**
+ * This associates the network environment with a TaskManager and JobManager.
+ * This will actually start the network components.
+ *
+ * @param jobManagerRef The JobManager actor reference.
+ * @param taskManagerRef The TaskManager actor reference.
+ *
+ * @throws IOException Thrown if the network subsystem (Netty) cannot be properly started.
+ */
+ public void associateWithTaskManagerAndJobManager(ActorRef jobManagerRef, ActorRef taskManagerRef)
+ throws IOException
+ {
+ checkNotNull(jobManagerRef);
+ checkNotNull(taskManagerRef);
+
+ synchronized (lock) {
+ if (isShutdown) {
+ throw new IllegalStateException("environment is shut down");
+ }
- // Buffer pool for the partition
- BufferPool bufferPool = null;
+ if (this.partitionConsumableNotifier == null &&
+ this.partitionManager == null &&
+ this.taskEventDispatcher == null &&
+ this.connectionManager == null)
+ {
+ // good, not currently associated. start the individual components
- try {
- bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
- partition.registerBufferPool(bufferPool);
+ this.partitionManager = new ResultPartitionManager();
+ this.taskEventDispatcher = new TaskEventDispatcher();
+ this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(
+ jobManagerRef, taskManagerRef, new Timeout(jobManagerTimeout));
- partitionManager.registerResultPartition(partition);
- }
- catch (Throwable t) {
- if (bufferPool != null) {
- bufferPool.lazyDestroy();
- }
+ // ----- Network connections -----
+ final Option<NettyConfig> nettyConfig = configuration.nettyConfig();
+ connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get())
+ : new LocalConnectionManager();
- if (t instanceof IOException) {
- throw (IOException) t;
+ try {
+ connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
}
- else {
- throw new IOException(t.getMessage(), t);
+ catch (Throwable t) {
+ throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
}
}
-
- // Register writer with task event dispatcher
- taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
+ else {
+ throw new IllegalStateException(
+ "Network Environment is already associated with a JobManager/TaskManager");
+ }
}
+ }
- // Setup the buffer pool for each buffer reader
- final SingleInputGate[] inputGates = task.getInputGates();
+ public void disassociate() throws IOException {
+ synchronized (lock) {
+ if (!isAssociated()) {
+ return;
+ }
- for (SingleInputGate gate : inputGates) {
- BufferPool bufferPool = null;
+ LOG.debug("Disassociating NetworkEnvironment from TaskManager. Cleaning all intermediate results.");
- try {
- bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
- gate.setBufferPool(bufferPool);
- }
- catch (Throwable t) {
- if (bufferPool != null) {
- bufferPool.lazyDestroy();
+ // terminate all network connections
+ if (connectionManager != null) {
+ try {
+ LOG.debug("Shutting down network connection manager");
+ connectionManager.shutdown();
+ connectionManager = null;
+ }
+ catch (Throwable t) {
+ throw new IOException("Cannot shutdown network connection manager", t);
}
+ }
- if (t instanceof IOException) {
- throw (IOException) t;
+ // shutdown all intermediate results
+ if (partitionManager != null) {
+ try {
+ LOG.debug("Shutting down intermediate result partition manager");
+ partitionManager.shutdown();
+ partitionManager = null;
}
- else {
- throw new IOException(t.getMessage(), t);
+ catch (Throwable t) {
+ throw new IOException("Cannot shutdown partition manager", t);
}
}
+
+ partitionConsumableNotifier = null;
+
+ if (taskEventDispatcher != null) {
+ taskEventDispatcher.clearAll();
+ taskEventDispatcher = null;
+ }
+
+ // make sure that the global buffer pool re-acquires all buffers
+ networkBufferPool.destroyAllBufferPools();
}
}
- public void unregisterTask(Task task) {
- LOG.debug("Unregistering task {} ({}) from network environment (state: {}).",
- task.getTaskNameWithSubtasks(), task.getExecutionState());
- final ExecutionAttemptID executionId = task.getExecutionId();
- if (task.isCanceledOrFailed()) {
- partitionManager.releasePartitionsProducedBy(executionId);
+ // --------------------------------------------------------------------------------------------
+ // Task operations
+ // --------------------------------------------------------------------------------------------
+
+ public void registerTask(Task task) throws IOException {
+ final ResultPartition[] producedPartitions = task.getProducedPartitions();
+ final ResultPartitionWriter[] writers = task.getWriters();
+
+ if (writers.length != producedPartitions.length) {
+ throw new IllegalStateException("Unequal number of writers and partitions.");
}
- ResultPartitionWriter[] writers = task.getWriters();
+ synchronized (lock) {
+ if (isShutdown) {
+ throw new IllegalStateException("NetworkEnvironment is shut down");
+ }
+ if (!isAssociated()) {
+ throw new IllegalStateException("NetworkEnvironment is not associated with a TaskManager");
+ }
+
+ for (int i = 0; i < producedPartitions.length; i++) {
+ final ResultPartition partition = producedPartitions[i];
+ final ResultPartitionWriter writer = writers[i];
- if (writers != null) {
- for (ResultPartitionWriter writer : task.getWriters()) {
- taskEventDispatcher.unregisterWriter(writer);
+ // Buffer pool for the partition
+ BufferPool bufferPool = null;
+
+ try {
+ bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
+ partition.registerBufferPool(bufferPool);
+
+ partitionManager.registerResultPartition(partition);
+ }
+ catch (Throwable t) {
+ if (bufferPool != null) {
+ bufferPool.lazyDestroy();
+ }
+
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+ else {
+ throw new IOException(t.getMessage(), t);
+ }
+ }
+
+ // Register writer with task event dispatcher
+ taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
}
- }
- final SingleInputGate[] inputGates = task.getInputGates();
+ // Setup the buffer pool for each buffer reader
+ final SingleInputGate[] inputGates = task.getInputGates();
- if (inputGates != null) {
for (SingleInputGate gate : inputGates) {
+ BufferPool bufferPool = null;
+
try {
- if (gate != null) {
- gate.releaseAllResources();
- }
+ bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
+ gate.setBufferPool(bufferPool);
}
- catch (IOException e) {
- LOG.error("Error during release of reader resources: " + e.getMessage(), e);
+ catch (Throwable t) {
+ if (bufferPool != null) {
+ bufferPool.lazyDestroy();
+ }
+
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+ else {
+ throw new IOException(t.getMessage(), t);
+ }
}
}
}
}
- public ResultPartitionManager getPartitionManager() {
- return partitionManager;
- }
+ public void unregisterTask(Task task) {
+ LOG.debug("Unregistering task {} ({}) from network environment (state: {}).",
+ task.getTaskNameWithSubtasks(), task.getExecutionState());
- public TaskEventDispatcher getTaskEventDispatcher() {
- return taskEventDispatcher;
- }
+ final ExecutionAttemptID executionId = task.getExecutionId();
- public ConnectionManager getConnectionManager() {
- return connectionManager;
- }
+ synchronized (lock) {
+ if (isShutdown || !isAssociated()) {
+ // no need to do anything when we are not operational
+ return;
+ }
- public NetworkBufferPool getNetworkBufferPool() {
- return networkBufferPool;
- }
+ if (task.isCanceledOrFailed()) {
+ partitionManager.releasePartitionsProducedBy(executionId);
+ }
- public IOMode getDefaultIOMode() {
- return configuration.ioMode();
- }
+ ResultPartitionWriter[] writers = task.getWriters();
- public ResultPartitionConsumableNotifier getPartitionConsumableNotifier() {
- return partitionConsumableNotifier;
+ if (writers != null) {
+ for (ResultPartitionWriter writer : task.getWriters()) {
+ taskEventDispatcher.unregisterWriter(writer);
+ }
+ }
+
+ final SingleInputGate[] inputGates = task.getInputGates();
+
+ if (inputGates != null) {
+ for (SingleInputGate gate : inputGates) {
+ try {
+ if (gate != null) {
+ gate.releaseAllResources();
+ }
+ }
+ catch (IOException e) {
+ LOG.error("Error during release of reader resources: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
}
public boolean hasReleasedAllResources() {
@@ -281,32 +378,25 @@ public class NetworkEnvironment {
* Tries to shut down all network I/O components.
*/
public void shutdown() {
- if (!isShutdown) {
+ synchronized (lock) {
+ if (isShutdown) {
+ return;
+ }
+
+ // shut down all connections and free all intermediate result partitions
try {
- if (networkBufferPool != null) {
- networkBufferPool.destroy();
- }
+ disassociate();
}
catch (Throwable t) {
- LOG.warn("Network buffer pool did not shut down properly: " + t.getMessage(), t);
- }
-
- if (partitionManager != null) {
- try {
- partitionManager.shutdown();
- }
- catch (Throwable t) {
- LOG.warn("Partition manager did not shut down properly: " + t.getMessage(), t);
- }
+ LOG.warn("Network services did not shut down properly: " + t.getMessage(), t);
}
+ // destroy the buffer pool
try {
- if (connectionManager != null) {
- connectionManager.shutdown();
- }
+ networkBufferPool.destroy();
}
catch (Throwable t) {
- LOG.warn("Network connection manager did not shut down properly: " + t.getMessage(), t);
+ LOG.warn("Network buffer pool did not shut down properly: " + t.getMessage(), t);
}
isShutdown = true;
@@ -320,13 +410,20 @@ public class NetworkEnvironment {
/**
* Notifies the job manager about consumable partitions.
*/
- private static class JobManagerResultPartitionConsumableNotifier
- implements ResultPartitionConsumableNotifier {
+ private static class JobManagerResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+
+ private final ActorRef jobManager;
+
+ private final ActorRef taskManager;
- private final NetworkEnvironment networkEnvironment;
+ private final Timeout jobManagerMessageTimeout;
- public JobManagerResultPartitionConsumableNotifier(NetworkEnvironment networkEnvironment) {
- this.networkEnvironment = networkEnvironment;
+ public JobManagerResultPartitionConsumableNotifier(ActorRef jobManager,
+ ActorRef taskManager,
+ Timeout jobManagerMessageTimeout) {
+ this.jobManager = jobManager;
+ this.taskManager = taskManager;
+ this.jobManagerMessageTimeout = jobManagerMessageTimeout;
}
@Override
@@ -334,23 +431,20 @@ public class NetworkEnvironment {
final ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, partitionId);
- Future<Object> futureResponse = Patterns.ask(
- networkEnvironment.getJobManager(),
- msg,
- networkEnvironment.getJobManagerTimeout());
+ Future<Object> futureResponse = Patterns.ask(jobManager, msg, jobManagerMessageTimeout);
futureResponse.onFailure(new OnFailure() {
@Override
- public void onFailure(Throwable failure) throws Throwable {
+ public void onFailure(Throwable failure) {
LOG.error("Could not schedule or update consumers at the JobManager.", failure);
// Fail task at the TaskManager
FailTask failMsg = new FailTask(
partitionId.getProducerId(),
- new RuntimeException("Could not schedule or update consumers at " +
- "the JobManager.", failure));
+ new RuntimeException("Could not notify JobManager to schedule or update consumers",
+ failure));
- networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender());
+ taskManager.tell(failMsg, ActorRef.noSender());
}
}, AkkaUtils.globalExecutionContext());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
index 845f72a..82793e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
@@ -70,6 +70,12 @@ public class TaskEventDispatcher {
return false;
}
+ public void clearAll() {
+ synchronized (registeredWriters) {
+ registeredWriters.clear();
+ }
+ }
+
/**
* Returns the number of currently registered writers.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index c1e62da..cb1f118 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -30,9 +30,12 @@ import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
/**
- * A fixed size pool of {@link MemorySegment} instances for the network stack.
- * <p>
- * This class is thread-safe.
+ * The NetworkBufferPool is a fixed size pool of {@link MemorySegment} instances
+ * for the network stack.
+ *
+ * The NetworkBufferPool creates {@link LocalBufferPool}s from which the individual tasks draw
+ * the buffers for the network data transfer. When new local buffer pools are created, the
+ * NetworkBufferPool dynamically redistributes the buffers between the pools.
*/
public class NetworkBufferPool implements BufferPoolFactory {
@@ -62,7 +65,15 @@ public class NetworkBufferPool implements BufferPoolFactory {
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
this.memorySegmentSize = segmentSize;
- this.availableMemorySegments = new ArrayBlockingQueue<MemorySegment>(numberOfSegmentsToAllocate);
+
+ final long sizeInLong = (long) segmentSize;
+
+ try {
+ this.availableMemorySegments = new ArrayBlockingQueue<MemorySegment>(numberOfSegmentsToAllocate);
+ }
+ catch (OutOfMemoryError err) {
+ throw new OutOfMemoryError("Could not allocate buffer queue of length " + numberOfSegmentsToAllocate);
+ }
try {
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
@@ -70,15 +81,22 @@ public class NetworkBufferPool implements BufferPoolFactory {
}
}
catch (OutOfMemoryError err) {
- int requiredMb = (numberOfSegmentsToAllocate * segmentSize) >> 20;
- int allocatedMb = ((availableMemorySegments.size()) * segmentSize) >> 20;
- int missingMb = requiredMb - allocatedMb;
+ int allocated = availableMemorySegments.size();
+
+ // free some memory
+ availableMemorySegments.clear();
+
+ long requiredMb = (sizeInLong * numberOfSegmentsToAllocate) >> 20;
+ long allocatedMb = (sizeInLong * allocated) >> 20;
+ long missingMb = requiredMb - allocatedMb;
- throw new OutOfMemoryError("Could not allocate enough memory segments for GlobalBufferPool (required (Mb): " +
- requiredMb + ", allocated (Mb): " + allocatedMb + ", missing (Mb): " + missingMb + ").");
+ throw new OutOfMemoryError("Could not allocate enough memory segments for NetworkBufferPool " +
+ "(required (Mb): " + requiredMb +
+ ", allocated (Mb): " + allocatedMb +
+ ", missing (Mb): " + missingMb + ").");
}
- int allocatedMb = ((availableMemorySegments.size()) * segmentSize) >> 20;
+ long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;
LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
allocatedMb, availableMemorySegments.size(), segmentSize);
@@ -186,6 +204,10 @@ public class NetworkBufferPool implements BufferPoolFactory {
@Override
public void destroyBufferPool(BufferPool bufferPool) {
+ if (!(bufferPool instanceof LocalBufferPool)) {
+ throw new IllegalArgumentException("bufferPool is no LocalBufferPool");
+ }
+
synchronized (factoryLock) {
if (allBufferPools.remove(bufferPool)) {
managedBufferPools.remove(bufferPool);
@@ -201,6 +223,26 @@ public class NetworkBufferPool implements BufferPoolFactory {
}
}
+ /**
+ * Destroys all buffer pools that allocate their buffers from this
+ * buffer pool (created via {@link #createBufferPool(int, boolean)}).
+ */
+ public void destroyAllBufferPools() {
+ synchronized (factoryLock) {
+ // create a copy to avoid concurrent modification exceptions
+ LocalBufferPool[] poolsCopy = allBufferPools.toArray(new LocalBufferPool[allBufferPools.size()]);
+
+ for (LocalBufferPool pool : poolsCopy) {
+ pool.lazyDestroy();
+ }
+
+ // some sanity checks
+ if (allBufferPools.size() > 0 || managedBufferPools.size() > 0 || numTotalRequiredBuffers > 0) {
+ throw new IllegalStateException("NetworkBufferPool is not empty after destroying all LocalBufferPools");
+ }
+ }
+ }
+
// Must be called from synchronized block
private void redistributeBuffers() throws IOException {
int numManagedBufferPools = managedBufferPools.size();
http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f552717..f558d48 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -319,10 +319,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
_ ! Heartbeat(instanceID, report)
}
-
- case LogMemoryUsage =>
- logMemoryStats()
-
case SendStackTrace =>
val traces = Thread.getAllStackTraces.asScala
val stackTraceStr = traces.map((trace: (Thread, Array[StackTraceElement])) => {
@@ -668,7 +664,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
}
try {
- networkEnvironment = Some(new NetworkEnvironment(self, jobManager, timeout, networkConfig))
+ val env: NetworkEnvironment = new NetworkEnvironment(timeout, networkConfig)
+ env.associateWithTaskManagerAndJobManager(jobManager, self)
+ networkEnvironment = Some(env)
} catch {
case ioe: IOException =>
log.error(ioe, "Failed to instantiate network environment.")
@@ -726,7 +724,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
libraryCacheManager foreach { _.unregisterTask(task.getJobID, executionID) }
log.info("Updating FINAL execution state of {} ({}) to {}.", task.getTaskName,
- task.getExecutionId, task.getExecutionState);
+ task.getExecutionId, task.getExecutionState)
self ! UpdateTaskExecutionState(new TaskExecutionState(
task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause))
@@ -746,8 +744,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
}
} catch {
- case t: Throwable => log.error("Error cleaning up local files from the distributed cache" +
- ".", t)
+ case t: Throwable => log.error(
+ "Error cleaning up local files from the distributed cache.", t)
}
}
@@ -761,16 +759,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
task.unregisterMemoryManager(memoryManager)
}
-
- private def logMemoryStats(): Unit = {
- if (log.isInfoEnabled) {
- val memoryMXBean = ManagementFactory.getMemoryMXBean
- val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans.asScala
-
- log.info(TaskManager.getMemoryUsageStatsAsString(memoryMXBean))
- log.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans))
- }
- }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
new file mode 100644
index 0000000..bb95f4b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import static org.junit.Assert.*;
+
+import akka.actor.ActorRef;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Some;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+public class NetworkEnvironmentTest {
+
+ @Test
+ public void testAssociateDisassociate() {
+ final int BUFFER_SIZE = 1024;
+ final int NUM_BUFFERS = 20;
+
+ final int port;
+ try {
+ port = NetUtils.getAvailablePort();
+ }
+ catch (Throwable t) {
+ // ignore
+ return;
+ }
+
+ try {
+ NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, new Configuration());
+ NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
+ NUM_BUFFERS, BUFFER_SIZE, IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf));
+
+ NetworkEnvironment env = new NetworkEnvironment(new FiniteDuration(30, TimeUnit.SECONDS), config);
+
+ assertFalse(env.isShutdown());
+ assertFalse(env.isAssociated());
+
+ // pool must be started already
+ assertNotNull(env.getNetworkBufferPool());
+ assertEquals(NUM_BUFFERS, env.getNetworkBufferPool().getTotalNumberOfMemorySegments());
+
+ // others components are still shut down
+ assertNull(env.getConnectionManager());
+ assertNull(env.getPartitionConsumableNotifier());
+ assertNull(env.getTaskEventDispatcher());
+ assertNull(env.getPartitionManager());
+
+ // associate the environment with some mock actors
+ ActorRef jmActor = Mockito.mock(ActorRef.class);
+ ActorRef tmActor = Mockito.mock(ActorRef.class);
+ env.associateWithTaskManagerAndJobManager(jmActor, tmActor);
+
+ assertNotNull(env.getConnectionManager());
+ assertNotNull(env.getPartitionConsumableNotifier());
+ assertNotNull(env.getTaskEventDispatcher());
+ assertNotNull(env.getPartitionManager());
+
+ // allocate some buffer pool
+ BufferPool localPool = env.getNetworkBufferPool().createBufferPool(10, false);
+ assertNotNull(localPool);
+
+ // disassociate
+ env.disassociate();
+
+ assertNull(env.getConnectionManager());
+ assertNull(env.getPartitionConsumableNotifier());
+ assertNull(env.getTaskEventDispatcher());
+ assertNull(env.getPartitionManager());
+
+ assertNotNull(env.getNetworkBufferPool());
+ assertTrue(localPool.isDestroyed());
+
+ // associate once again
+ jmActor = Mockito.mock(ActorRef.class);
+ tmActor = Mockito.mock(ActorRef.class);
+ env.associateWithTaskManagerAndJobManager(jmActor, tmActor);
+
+ assertNotNull(env.getConnectionManager());
+ assertNotNull(env.getPartitionConsumableNotifier());
+ assertNotNull(env.getTaskEventDispatcher());
+ assertNotNull(env.getPartitionManager());
+
+ // shutdown for good
+ env.shutdown();
+
+ assertTrue(env.isShutdown());
+ assertFalse(env.isAssociated());
+ assertNull(env.getConnectionManager());
+ assertNull(env.getPartitionConsumableNotifier());
+ assertNull(env.getTaskEventDispatcher());
+ assertNull(env.getPartitionManager());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index c9a4b6d..9f04d94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -42,6 +42,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.spy;
http://git-wip-us.apache.org/repos/asf/flink/blob/ee273dbe/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
new file mode 100644
index 0000000..b24082e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class NetworkBufferPoolTest {
+
+ @Test
+ public void testCreatePoolAfterDestroy() {
+ try {
+ final int bufferSize = 128;
+ final int numBuffers = 10;
+
+ NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize);
+ assertEquals(bufferSize, globalPool.getMemorySegmentSize());
+ assertEquals(numBuffers, globalPool.getTotalNumberOfMemorySegments());
+ assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments());
+ assertEquals(0, globalPool.getNumberOfRegisteredBufferPools());
+
+ globalPool.destroy();
+
+ assertTrue(globalPool.isDestroyed());
+
+ try {
+ globalPool.createBufferPool(2, true);
+ fail("Should throw an IllegalStateException");
+ }
+ catch (IllegalStateException e) {
+ // yippie!
+ }
+
+ try {
+ globalPool.createBufferPool(2, false);
+ fail("Should throw an IllegalStateException");
+ }
+ catch (IllegalStateException e) {
+ // yippie!
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ }
+ @Test
+ public void testDestroyAll() {
+ try {
+ NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
+
+ BufferPool fixedPool = globalPool.createBufferPool(2, true);
+ BufferPool nonFixedPool = globalPool.createBufferPool(5, false);
+
+ assertEquals(2, fixedPool.getNumberOfRequiredMemorySegments());
+ assertEquals(5, nonFixedPool.getNumberOfRequiredMemorySegments());
+
+ Buffer[] buffers = {
+ fixedPool.requestBuffer(),
+ fixedPool.requestBuffer(),
+
+ nonFixedPool.requestBuffer(),
+ nonFixedPool.requestBuffer(),
+ nonFixedPool.requestBuffer(),
+ nonFixedPool.requestBuffer(),
+ nonFixedPool.requestBuffer(),
+ nonFixedPool.requestBuffer(),
+ nonFixedPool.requestBuffer(),
+ nonFixedPool.requestBuffer()
+ };
+
+ for (Buffer b : buffers) {
+ assertNotNull(b);
+ assertNotNull(b.getMemorySegment());
+ }
+
+ assertNull(fixedPool.requestBuffer());
+ assertNull(nonFixedPool.requestBuffer());
+
+ // destroy all allocated ones
+ globalPool.destroyAllBufferPools();
+
+ // check the destroyed status
+ assertFalse(globalPool.isDestroyed());
+ assertTrue(fixedPool.isDestroyed());
+ assertTrue(nonFixedPool.isDestroyed());
+
+ assertEquals(0, globalPool.getNumberOfRegisteredBufferPools());
+
+ // buffers are not yet recycled
+ assertEquals(0, globalPool.getNumberOfAvailableMemorySegments());
+
+ // the recycled buffers should go to the global pool
+ for (Buffer b : buffers) {
+ b.recycle();
+ }
+ assertEquals(globalPool.getTotalNumberOfMemorySegments(), globalPool.getNumberOfAvailableMemorySegments());
+
+ // can request no more buffers
+ assertNull(fixedPool.requestBuffer());
+ assertNull(nonFixedPool.requestBuffer());
+
+ // can create a new pool now
+ assertNotNull(globalPool.createBufferPool(10, false));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}