You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/04 11:06:39 UTC
[5/6] git commit: [FLINK-1198] Make Broadcast variables shared per
taskmanager,
rather than a shared singleton per JVM (supports multiple TaskManagers in the
same JVM)
[FLINK-1198] Make Broadcast variables shared per taskmanager, rather than a shared singleton per JVM
(supports multiple TaskManagers in the same JVM)
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d8052d48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d8052d48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d8052d48
Branch: refs/heads/master
Commit: d8052d488c999085375c78e31be70bf75e11c420
Parents: cc76653
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 3 15:47:13 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 20:58:49 2014 +0100
----------------------------------------------------------------------
.../client/minicluster/NepheleMiniCluster.java | 15 ++++++++++++++
.../broadcast/BroadcastVariableManager.java | 9 ---------
.../flink/runtime/execution/Environment.java | 4 ++++
.../runtime/execution/RuntimeEnvironment.java | 13 +++++++++++-
.../runtime/operators/RegularPactTask.java | 7 +++----
.../flink/runtime/taskmanager/TaskManager.java | 9 ++++++++-
.../operators/testutils/MockEnvironment.java | 9 +++++++++
.../flink/runtime/taskmanager/TaskTest.java | 5 +++--
.../flink/test/util/AbstractTestBase.java | 21 ++++++++++++++++----
9 files changed, 71 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index 20ed5cc..e04006c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -29,8 +29,11 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -238,6 +241,18 @@ public class NepheleMiniCluster {
}
}
}
+
+ public TaskManager[] getTaskManagers() {
+ JobManager jm = this.jobManager;
+ if (jm != null) {
+ InstanceManager im = jm.getInstanceManager();
+ if (im instanceof LocalInstanceManager) {
+ return ((LocalInstanceManager) im).getTaskManagers();
+ }
+ }
+
+ return null;
+ }
// ------------------------------------------------------------------------
// Network utility methods
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
index 3cf9722..6d1a816 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
@@ -29,15 +29,6 @@ import org.apache.flink.runtime.operators.RegularPactTask;
public class BroadcastVariableManager {
- /** The singleton instance that keeps track of the shared broadcast variables. */
- public static final BroadcastVariableManager INSTANCE = new BroadcastVariableManager();
-
- /** Prevent external instantiation. */
- private BroadcastVariableManager() {}
-
-
- // --------------------------------------------------------------------------------------------
-
private final ConcurrentHashMap<BroadcastVariableKey, BroadcastVariableMaterialization<?>> variables =
new ConcurrentHashMap<BroadcastVariableKey, BroadcastVariableMaterialization<?>>(16);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 63c6135..d0710db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -26,6 +26,7 @@ import java.util.concurrent.FutureTask;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
import org.apache.flink.runtime.io.network.channels.ChannelID;
@@ -231,4 +232,7 @@ public interface Environment {
BufferProvider getOutputBufferProvider();
Map<String, FutureTask<Path>> getCopyTask();
+
+
+ BroadcastVariableManager getBroadcastVariableManager();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index dc282b5..715bbd7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -124,6 +125,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
private final Map<String,FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
+ private final BroadcastVariableManager bcVarManager;
+
private LocalBufferPool outputBufferPool;
private AtomicBoolean canceled = new AtomicBoolean();
@@ -133,7 +136,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
ClassLoader userCodeClassLoader,
MemoryManager memoryManager, IOManager ioManager,
InputSplitProvider inputSplitProvider,
- AccumulatorProtocol accumulatorProtocolProxy)
+ AccumulatorProtocol accumulatorProtocolProxy,
+ BroadcastVariableManager bcVarManager)
throws Exception
{
Preconditions.checkNotNull(owner);
@@ -142,6 +146,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
Preconditions.checkNotNull(inputSplitProvider);
Preconditions.checkNotNull(accumulatorProtocolProxy);
Preconditions.checkNotNull(userCodeClassLoader);
+ Preconditions.checkNotNull(bcVarManager);
this.owner = owner;
@@ -149,6 +154,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
this.ioManager = ioManager;
this.inputSplitProvider = inputSplitProvider;
this.accumulatorProtocolProxy = accumulatorProtocolProxy;
+ this.bcVarManager = bcVarManager;
// load and instantiate the invokable class
this.userCodeClassLoader = userCodeClassLoader;
@@ -509,6 +515,11 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
}
@Override
+ public BroadcastVariableManager getBroadcastVariableManager() {
+ return this.bcVarManager;
+ }
+
+ @Override
public Configuration getTaskConfiguration() {
return this.taskConfiguration;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 5cb406b..c3e3515 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -33,7 +33,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.accumulators.AccumulatorEvent;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -433,7 +432,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
final MutableReader<?> reader = this.broadcastInputReaders[inputNum];
- List<X> variable = BroadcastVariableManager.INSTANCE.getBroadcastVariable(bcVarName, superstep, this, reader, serializerFactory);
+ List<X> variable = getEnvironment().getBroadcastVariableManager().getBroadcastVariable(bcVarName, superstep, this, reader, serializerFactory);
context.setBroadcastVariable(bcVarName, variable);
}
@@ -444,7 +443,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
(superstep > 1 ? ", superstep " + superstep : "")));
}
- BroadcastVariableManager.INSTANCE.releaseReference(bcVarName, superstep, this);
+ getEnvironment().getBroadcastVariableManager().releaseReference(bcVarName, superstep, this);
context.clearBroadcastVariable(bcVarName);
}
@@ -610,7 +609,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
LOG.debug(formatLogString("Releasing all broadcast variables."));
}
- BroadcastVariableManager.INSTANCE.releaseAllReferencesFromTask(this);
+ getEnvironment().getBroadcastVariableManager().releaseAllReferencesFromTask(this);
if (runtimeUdfContext != null) {
runtimeUdfContext.clearAllBroadcastVariables();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 1380d91..7220b23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -60,6 +60,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.core.protocols.VersionedProtocol;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -141,6 +142,8 @@ public class TaskManager implements TaskOperationProtocol {
private final LibraryCacheManager libraryCacheManager;
+ private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
+
private final Server taskManagerServer;
private final FileCache fileCache = new FileCache();
@@ -538,6 +541,10 @@ public class TaskManager implements TaskOperationProtocol {
return channelManager;
}
+ public BroadcastVariableManager getBroadcastVariableManager() {
+ return this.bcVarManager;
+ }
+
// --------------------------------------------------------------------------------------------
// Task Operation
// --------------------------------------------------------------------------------------------
@@ -599,7 +606,7 @@ public class TaskManager implements TaskOperationProtocol {
}
final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId, executionId);
- final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy);
+ final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy, this.bcVarManager);
task.setEnvironment(env);
// register the task with the network stack and profilers
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index bd32f10..5e0edc4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.Buffer;
@@ -74,6 +75,9 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
private final JobID jobID = new JobID();
private final Buffer mockBuffer;
+
+ private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
+
public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
this.jobConfiguration = new Configuration();
@@ -351,4 +355,9 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
public JobVertexID getJobVertexId() {
return new JobVertexID(new byte[16]);
}
+
+ @Override
+ public BroadcastVariableManager getBroadcastVariableManager() {
+ return this.bcVarManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index ce31bd0..ef9b7ef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -264,7 +265,7 @@ public class TaskTest {
RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, getClass().getClassLoader(),
mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
- mock(AccumulatorProtocol.class));
+ mock(AccumulatorProtocol.class), new BroadcastVariableManager());
task.setEnvironment(env);
@@ -302,7 +303,7 @@ public class TaskTest {
RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, getClass().getClassLoader(),
mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
- mock(AccumulatorProtocol.class));
+ mock(AccumulatorProtocol.class), new BroadcastVariableManager());
task.setEnvironment(env);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 8460382..c043ea8 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -40,7 +40,7 @@ import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.flink.client.minicluster.NepheleMiniCluster;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Assert;
@@ -93,18 +93,31 @@ public abstract class AbstractTestBase {
public void stopCluster() throws Exception {
try {
+
+ int numUnreleasedBCVars = 0;
+ {
+ TaskManager[] tms = executor.getTaskManagers();
+ if (tms != null) {
+ for (TaskManager tm : tms) {
+ numUnreleasedBCVars += tm.getBroadcastVariableManager().getNumberOfVariablesWithReferences();
+ }
+ }
+ }
+
if (this.executor != null) {
this.executor.stop();
this.executor = null;
FileSystem.closeAll();
System.gc();
}
- } finally {
+
+ Assert.assertEquals("Not all broadcast variables were released.", 0, numUnreleasedBCVars);
+ }
+ finally {
deleteAllTempFiles();
}
- Assert.assertEquals("Not all broadcast variables were released.",
- 0, BroadcastVariableManager.INSTANCE.getNumberOfVariablesWithReferences());
+
}
//------------------