You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/04 11:06:39 UTC

[5/6] git commit: [FLINK-1198] Make Broadcast variables shared per taskmanager, rather than a shared singleton per JVM (supports multiple TaskManagers in the same JVM)

[FLINK-1198] Make Broadcast variables shared per taskmanager, rather than a shared singleton per JVM
             (supports multiple TaskManagers in the same JVM)


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d8052d48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d8052d48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d8052d48

Branch: refs/heads/master
Commit: d8052d488c999085375c78e31be70bf75e11c420
Parents: cc76653
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 3 15:47:13 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 20:58:49 2014 +0100

----------------------------------------------------------------------
 .../client/minicluster/NepheleMiniCluster.java  | 15 ++++++++++++++
 .../broadcast/BroadcastVariableManager.java     |  9 ---------
 .../flink/runtime/execution/Environment.java    |  4 ++++
 .../runtime/execution/RuntimeEnvironment.java   | 13 +++++++++++-
 .../runtime/operators/RegularPactTask.java      |  7 +++----
 .../flink/runtime/taskmanager/TaskManager.java  |  9 ++++++++-
 .../operators/testutils/MockEnvironment.java    |  9 +++++++++
 .../flink/runtime/taskmanager/TaskTest.java     |  5 +++--
 .../flink/test/util/AbstractTestBase.java       | 21 ++++++++++++++++----
 9 files changed, 71 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index 20ed5cc..e04006c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -29,8 +29,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.ExecutionMode;
 import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 
@@ -238,6 +241,18 @@ public class NepheleMiniCluster {
 			}
 		}
 	}
+	
+	public TaskManager[] getTaskManagers() {
+		JobManager jm = this.jobManager;
+		if (jm != null) {
+			InstanceManager im = jm.getInstanceManager();
+			if (im instanceof LocalInstanceManager) {
+				return ((LocalInstanceManager) im).getTaskManagers();
+			}
+		}
+		
+		return null;
+	}
 
 	// ------------------------------------------------------------------------
 	// Network utility methods

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
index 3cf9722..6d1a816 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
@@ -29,15 +29,6 @@ import org.apache.flink.runtime.operators.RegularPactTask;
 
 public class BroadcastVariableManager {
 	
-	/** The singleton instance that keeps track of the shared broadcast variables. */
-	public static final BroadcastVariableManager INSTANCE = new BroadcastVariableManager();
-	
-	/** Prevent external instantiation. */
-	private BroadcastVariableManager() {}
-	
-	
-	// --------------------------------------------------------------------------------------------
-	
 	private final ConcurrentHashMap<BroadcastVariableKey, BroadcastVariableMaterialization<?>> variables =
 							new ConcurrentHashMap<BroadcastVariableKey, BroadcastVariableMaterialization<?>>(16);
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 63c6135..d0710db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -26,6 +26,7 @@ import java.util.concurrent.FutureTask;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
@@ -231,4 +232,7 @@ public interface Environment {
 	BufferProvider getOutputBufferProvider();
 
 	Map<String, FutureTask<Path>> getCopyTask();
+	
+	
+	BroadcastVariableManager getBroadcastVariableManager();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index dc282b5..715bbd7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -124,6 +125,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	private final Map<String,FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
 	
+	private final BroadcastVariableManager bcVarManager;
+	
 	private LocalBufferPool outputBufferPool;
 	
 	private AtomicBoolean canceled = new AtomicBoolean();
@@ -133,7 +136,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 							ClassLoader userCodeClassLoader,
 							MemoryManager memoryManager, IOManager ioManager,
 							InputSplitProvider inputSplitProvider,
-							AccumulatorProtocol accumulatorProtocolProxy)
+							AccumulatorProtocol accumulatorProtocolProxy,
+							BroadcastVariableManager bcVarManager)
 		throws Exception
 	{
 		Preconditions.checkNotNull(owner);
@@ -142,6 +146,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		Preconditions.checkNotNull(inputSplitProvider);
 		Preconditions.checkNotNull(accumulatorProtocolProxy);
 		Preconditions.checkNotNull(userCodeClassLoader);
+		Preconditions.checkNotNull(bcVarManager);
 		
 		this.owner = owner;
 
@@ -149,6 +154,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		this.ioManager = ioManager;
 		this.inputSplitProvider = inputSplitProvider;
 		this.accumulatorProtocolProxy = accumulatorProtocolProxy;
+		this.bcVarManager = bcVarManager;
 
 		// load and instantiate the invokable class
 		this.userCodeClassLoader = userCodeClassLoader;
@@ -509,6 +515,11 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	}
 
 	@Override
+	public BroadcastVariableManager getBroadcastVariableManager() {
+		return this.bcVarManager;
+	}
+	
+	@Override
 	public Configuration getTaskConfiguration() {
 		return this.taskConfiguration;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 5cb406b..c3e3515 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -33,7 +33,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.accumulators.AccumulatorEvent;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -433,7 +432,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		
 		final MutableReader<?> reader = this.broadcastInputReaders[inputNum];
 
-		List<X> variable = BroadcastVariableManager.INSTANCE.getBroadcastVariable(bcVarName, superstep, this, reader, serializerFactory);
+		List<X> variable = getEnvironment().getBroadcastVariableManager().getBroadcastVariable(bcVarName, superstep, this, reader, serializerFactory);
 		
 		context.setBroadcastVariable(bcVarName, variable);
 	}
@@ -444,7 +443,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 				(superstep > 1 ? ", superstep " + superstep : "")));
 		}
 		
-		BroadcastVariableManager.INSTANCE.releaseReference(bcVarName, superstep, this);
+		getEnvironment().getBroadcastVariableManager().releaseReference(bcVarName, superstep, this);
 		context.clearBroadcastVariable(bcVarName);
 	}
 	
@@ -610,7 +609,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			LOG.debug(formatLogString("Releasing all broadcast variables."));
 		}
 		
-		BroadcastVariableManager.INSTANCE.releaseAllReferencesFromTask(this);
+		getEnvironment().getBroadcastVariableManager().releaseAllReferencesFromTask(this);
 		if (runtimeUdfContext != null) {
 			runtimeUdfContext.clearAllBroadcastVariables();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 1380d91..7220b23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -60,6 +60,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.protocols.VersionedProtocol;
 import org.apache.flink.runtime.ExecutionMode;
 import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -141,6 +142,8 @@ public class TaskManager implements TaskOperationProtocol {
 
 	private final LibraryCacheManager libraryCacheManager;
 	
+	private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
+	
 	private final Server taskManagerServer;
 
 	private final FileCache fileCache = new FileCache();
@@ -538,6 +541,10 @@ public class TaskManager implements TaskOperationProtocol {
 		return channelManager;
 	}
 	
+	public BroadcastVariableManager getBroadcastVariableManager() {
+		return this.bcVarManager;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Task Operation
 	// --------------------------------------------------------------------------------------------
@@ -599,7 +606,7 @@ public class TaskManager implements TaskOperationProtocol {
 			}
 			
 			final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider, jobID, vertexId, executionId);
-			final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy);
+			final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy, this.bcVarManager);
 			task.setEnvironment(env);
 			
 			// register the task with the network stack and profilers

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index bd32f10..5e0edc4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.Buffer;
@@ -74,6 +75,9 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 	private final JobID jobID = new JobID();
 
 	private final Buffer mockBuffer;
+	
+	private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
+	
 
 	public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.jobConfiguration = new Configuration();
@@ -351,4 +355,9 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 	public JobVertexID getJobVertexId() {
 		return new JobVertexID(new byte[16]);
 	}
+	
+	@Override
+	public BroadcastVariableManager getBroadcastVariableManager() {
+		return this.bcVarManager;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index ce31bd0..ef9b7ef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -264,7 +265,7 @@ public class TaskTest {
 			
 			RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, getClass().getClassLoader(),
 					mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
-					mock(AccumulatorProtocol.class));
+					mock(AccumulatorProtocol.class), new BroadcastVariableManager());
 			
 			task.setEnvironment(env);
 			
@@ -302,7 +303,7 @@ public class TaskTest {
 			
 			RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, getClass().getClassLoader(),
 					mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
-					mock(AccumulatorProtocol.class));
+					mock(AccumulatorProtocol.class), new BroadcastVariableManager());
 			
 			task.setEnvironment(env);
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8052d48/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 8460382..c043ea8 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -40,7 +40,7 @@ import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.client.minicluster.NepheleMiniCluster;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.Assert;
 
@@ -93,18 +93,31 @@ public abstract class AbstractTestBase {
 	
 	public void stopCluster() throws Exception {
 		try {
+			
+			int numUnreleasedBCVars = 0;
+			{
+				TaskManager[] tms = executor.getTaskManagers();
+				if (tms != null) {
+					for (TaskManager tm : tms) {
+						numUnreleasedBCVars += tm.getBroadcastVariableManager().getNumberOfVariablesWithReferences();
+					}
+				}
+			}
+			
 			if (this.executor != null) {
 				this.executor.stop();
 				this.executor = null;
 				FileSystem.closeAll();
 				System.gc();
 			}
-		} finally {
+			
+			Assert.assertEquals("Not all broadcast variables were released.", 0, numUnreleasedBCVars);
+		}
+		finally {
 			deleteAllTempFiles();
 		}
 		
-		Assert.assertEquals("Not all broadcast variables were released.",
-				0, BroadcastVariableManager.INSTANCE.getNumberOfVariablesWithReferences());
+		
 	}
 
 	//------------------