You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/04 11:06:38 UTC

[4/6] git commit: [FLINK-1198] Broadcast variables are shared between tasks in the same TaskManager

[FLINK-1198] Broadcast variables are shared between tasks in the same TaskManager


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cc766533
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cc766533
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cc766533

Branch: refs/heads/master
Commit: cc766533e09c8b59cfb951c86c2e3989720fbeb9
Parents: bbd8209
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 27 13:23:02 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 20:58:49 2014 +0100

----------------------------------------------------------------------
 .../functions/BroadcastVariableInitializer.java |  24 ++
 .../api/common/functions/RuntimeContext.java    |  28 +-
 .../functions/util/RuntimeUDFContext.java       | 117 ++++++---
 .../java/org/apache/flink/util/NetUtils.java    |  12 +-
 .../flink/util/TraversableOnceIterable.java     |  42 +++
 .../functions/util/RuntimeUDFContextTest.java   | 173 +++++++++++++
 .../operators/translation/WrappingFunction.java | 120 +--------
 .../runtime/broadcast/BroadcastVariableKey.java |  81 ++++++
 .../broadcast/BroadcastVariableManager.java     | 113 ++++++++
 .../BroadcastVariableMaterialization.java       | 256 +++++++++++++++++++
 .../DefaultBroadcastVariableInitializer.java    |  48 ++++
 .../MaterializationExpiredException.java        |  23 ++
 .../flink/runtime/execution/Environment.java    |   9 +
 .../runtime/execution/RuntimeEnvironment.java   |   8 +-
 .../task/AbstractIterativePactTask.java         |   8 +-
 .../iterative/task/IterationTailPactTask.java   |   4 +-
 .../runtime/operators/RegularPactTask.java      |  74 ++++--
 .../operators/testutils/MockEnvironment.java    |   6 +
 .../test/compiler/util/OperatorResolver.java    |   1 +
 .../flink/test/util/AbstractTestBase.java       |   4 +
 .../BroadcastVarsNepheleITCase.java             |  10 +-
 .../ConnectedComponentsNepheleITCase.java       |   1 +
 .../test/recordJobs/kmeans/KMeansBroadcast.java |  11 +-
 23 files changed, 977 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
new file mode 100644
index 0000000..7fc81e5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+public interface BroadcastVariableInitializer<T, O> {
+	
+	O initializeBroadcastVariable(Iterable<T> data);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 5f6aaa5..7459a5a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -119,11 +119,37 @@ public interface RuntimeContext {
 	/**
 	 * Returns the result bound to the broadcast variable identified by the 
 	 * given {@code name}.
+	 * <p>
+	 * IMPORTANT: The broadcast variable data structure is shared between the parallel
+	 *            tasks on one machine. Any access that modifies its internal state needs to
+	 *            be manually synchronized by the caller.
+	 * 
+	 * @param name The name under which the broadcast variable is registered;
+	 * @return The broadcast variable, materialized as a list of elements.
 	 */
 	<RT> List<RT> getBroadcastVariable(String name);
+	
+	/**
+	 * Returns the result bound to the broadcast variable identified by the 
+	 * given {@code name}. The broadcast variable is returned as a shared data structure
+	 * that is initialized with the given {@link BroadcastVariableInitializer}.
+	 * <p>
+	 * IMPORTANT: The broadcast variable data structure is shared between the parallel
+	 *            tasks on one machine. Any access that modifies its internal state needs to
+	 *            be manually synchronized by the caller.
+	 * 
+	 * @param name The name under which the broadcast variable is registered;
+	 * @param initializer The initializer that creates the shared data structure of the broadcast
+	 *                    variable from the sequence of elements.
+	 * @return The broadcast variable, materialized as a list of elements.
+	 */
+	<T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer);
 
 	/**
-	 * Returns the distributed cache to get the local tmp file.
+	 * Returns the {@link DistributedCache} to get the local temporary file copies of files otherwise not
+	 * locally accessible.
+	 * 
+	 * @return The distributed cache of the worker executing this instance.
 	 */
 	DistributedCache getDistributedCache();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index c3d9076..95718f5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -30,11 +30,12 @@ import org.apache.flink.api.common.accumulators.Histogram;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.core.fs.Path;
 
 /**
- * Implementation of the {@link RuntimeContext}, created by runtime UDF operators.
+ * A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators.
  */
 public class RuntimeUDFContext implements RuntimeContext {
 
@@ -45,20 +46,23 @@ public class RuntimeUDFContext implements RuntimeContext {
 	private final int subtaskIndex;
 
 	private final ClassLoader userCodeClassLoader;
-	
-	private final DistributedCache distributedCache = new DistributedCache();
 
 	private final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
+	
+	private final DistributedCache distributedCache = new DistributedCache();
 
-	private final HashMap<String, List<?>> broadcastVars = new HashMap<String, List<?>>();
-
+	private final HashMap<String, Object> initializedBroadcastVars = new HashMap<String, Object>();
+	
+	private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>();
+	
+	
 	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) {
 		this.name = name;
 		this.numParallelSubtasks = numParallelSubtasks;
 		this.subtaskIndex = subtaskIndex;
 		this.userCodeClassLoader = userCodeClassLoader;
 	}
-
+	
 	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, Map<String, FutureTask<Path>> cpTasks) {
 		this.name = name;
 		this.numParallelSubtasks = numParallelSubtasks;
@@ -66,6 +70,7 @@ public class RuntimeUDFContext implements RuntimeContext {
 		this.userCodeClassLoader = userCodeClassLoader;
 		this.distributedCache.setCopyTasks(cpTasks);
 	}
+
 	
 	@Override
 	public String getTaskName() {
@@ -117,6 +122,72 @@ public class RuntimeUDFContext implements RuntimeContext {
 		return (Accumulator<V, A>) accumulators.get(name);
 	}
 
+	@Override
+	public HashMap<String, Accumulator<?, ?>> getAllAccumulators() {
+		return this.accumulators;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public <RT> List<RT> getBroadcastVariable(String name) {
+		
+		// check if we have an initialized version
+		Object o = this.initializedBroadcastVars.get(name);
+		if (o != null) {
+			if (o instanceof List) {
+				return (List<RT>) o;
+			}
+			else {
+				throw new IllegalStateException("The broadcast variable with name '" + name + 
+						"' is not a List. A different call must have requested this variable with a BroadcastVariableInitializer.");
+			}
+		}
+		else {
+			List<?> uninitialized = this.uninitializedBroadcastVars.remove(name);
+			if (uninitialized != null) {
+				this.initializedBroadcastVars.put(name, uninitialized);
+				return (List<RT>) uninitialized;
+			}
+			else {
+				throw new IllegalArgumentException("The broadcast variable with name '" + name + "' has not been set.");
+			}
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
+		
+		// check if we have an initialized version
+		Object o = this.initializedBroadcastVars.get(name);
+		if (o != null) {
+			return (C) o;
+		}
+		else {
+			List<T> uninitialized = (List<T>) this.uninitializedBroadcastVars.remove(name);
+			if (uninitialized != null) {
+				C result = initializer.initializeBroadcastVariable(uninitialized);
+				this.initializedBroadcastVars.put(name, result);
+				return result;
+			}
+			else {
+				throw new IllegalArgumentException("The broadcast variable with name '" + name + "' has not been set.");
+			}
+		}
+	}
+
+	@Override
+	public DistributedCache getDistributedCache() {
+		return this.distributedCache;
+	}
+	
+	@Override
+	public ClassLoader getUserCodeClassLoader() {
+		return this.userCodeClassLoader;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
 	@SuppressWarnings("unchecked")
 	private <V, A> Accumulator<V, A> getAccumulator(String name,
 			Class<? extends Accumulator<V, A>> accumulatorClass) {
@@ -138,34 +209,18 @@ public class RuntimeUDFContext implements RuntimeContext {
 		}
 		return (Accumulator<V, A>) accumulator;
 	}
-
-	@Override
-	public HashMap<String, Accumulator<?, ?>> getAllAccumulators() {
-		return this.accumulators;
-	}
-
+	
 	public void setBroadcastVariable(String name, List<?> value) {
-		this.broadcastVars.put(name, value);
-	}
-
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public <RT> List<RT> getBroadcastVariable(String name) {
-		if (!this.broadcastVars.containsKey(name)) {
-			throw new IllegalArgumentException("Trying to access an unbound broadcast variable '" 
-					+ name + "'.");
-		}
-		return (List<RT>) this.broadcastVars.get(name);
+		this.uninitializedBroadcastVars.put(name, value);
 	}
-
-	@Override
-	public DistributedCache getDistributedCache() {
-		return this.distributedCache;
+	
+	public void clearBroadcastVariable(String name) {
+		this.uninitializedBroadcastVars.remove(name);
+		this.initializedBroadcastVars.remove(name);
 	}
 	
-	@Override
-	public ClassLoader getUserCodeClassLoader() {
-		return this.userCodeClassLoader;
+	public void clearAllBroadcastVariables() {
+		this.uninitializedBroadcastVars.clear();
+		this.initializedBroadcastVars.clear();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index d2fcab8..c4538ec 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -19,15 +19,17 @@ package org.apache.flink.util;
 
 
 public class NetUtils {
+	
 	/**
-	 * Turn a fully qualified domain name (fqdn) into a hostname.
+	 * Turn a fully qualified domain name (fqdn) into a hostname. If the fqdn has multiple subparts
+	 * (separated by a period '.'), it will take the first part. Otherwise it takes the entire fqdn.
 	 * 
-	 * @param fqdn
-	 * @return
+	 * @param fqdn The fully qualified domain name.
+	 * @return The hostname.
 	 */
 	public static String getHostnameFromFQDN(String fqdn) {
-		if(fqdn == null) {
-			throw new IllegalArgumentException("Input string is null (fqdn)");
+		if (fqdn == null) {
+			throw new IllegalArgumentException("fqdn is null");
 		}
 		int dotPos = fqdn.indexOf('.');
 		if(dotPos == -1) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java
new file mode 100644
index 0000000..73e3cd6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.Iterator;
+
+public class TraversableOnceIterable<T> implements Iterable<T> {
+	
+	private final Iterator<T> iterator;
+
+	public TraversableOnceIterable(Iterator<T> iterator) {
+		if (iterator == null) {
+			throw new NullPointerException("The iterator must not be null.");
+		}
+		this.iterator = iterator;
+	}
+	
+	@Override
+	public Iterator<T> iterator() {
+		if (iterator != null) {
+			return iterator;
+		} else {
+			throw new TraversableOnceException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
new file mode 100644
index 0000000..671c3fd
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.junit.Test;
+
+
+public class RuntimeUDFContextTest {
+	
+	@Test
+	public void testBroadcastVariableNotFound() {
+		try {
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader());
+			
+			try {
+				ctx.getBroadcastVariable("some name");
+				fail("should throw an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+			
+			try {
+				ctx.getBroadcastVariableWithInitializer("some name", new BroadcastVariableInitializer<Object, Object>() {
+					public Object initializeBroadcastVariable(Iterable<Object> data) { return null; }
+				});
+				
+				fail("should throw an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBroadcastVariableSimple() {
+		try {
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader());
+			
+			ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
+			ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0));
+			
+			List<Integer> list1 = ctx.getBroadcastVariable("name1");
+			List<Double> list2 = ctx.getBroadcastVariable("name2");
+			
+			assertEquals(Arrays.asList(1, 2, 3, 4), list1);
+			assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list2);
+			
+			// access again
+			List<Integer> list3 = ctx.getBroadcastVariable("name1");
+			List<Double> list4 = ctx.getBroadcastVariable("name2");
+			
+			assertEquals(Arrays.asList(1, 2, 3, 4), list3);
+			assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list4);
+			
+			// and again ;-)
+			List<Integer> list5 = ctx.getBroadcastVariable("name1");
+			List<Double> list6 = ctx.getBroadcastVariable("name2");
+			
+			assertEquals(Arrays.asList(1, 2, 3, 4), list5);
+			assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list6);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testBroadcastVariableWithInitializer() {
+		try {
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader());
+			
+			ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
+			
+			// access it the first time with an initializer
+			List<Double> list = ctx.getBroadcastVariableWithInitializer("name", new ConvertingInitializer());
+			assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list);
+			
+			// access it the second time with an initializer (which might not get executed)
+			List<Double> list2 = ctx.getBroadcastVariableWithInitializer("name", new ConvertingInitializer());
+			assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list2);
+			
+			// access it the third time without an initializer (should work by "chance", because the result is a list)
+			List<Double> list3 = ctx.getBroadcastVariable("name");
+			assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list3);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBroadcastVariableWithInitializerAndMismatch() {
+		try {
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader());
+			
+			ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
+			
+			// access it the first time with an initializer
+			int sum = ctx.getBroadcastVariableWithInitializer("name", new SumInitializer());
+			assertEquals(10, sum);
+			
+			// access it the second time with no initializer -> should fail due to type mismatch
+			try {
+				ctx.getBroadcastVariable("name");
+				fail("should throw an exception");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class ConvertingInitializer implements BroadcastVariableInitializer<Integer, List<Double>> {
+		@Override
+		public List<Double> initializeBroadcastVariable(Iterable<Integer> data) {
+			List<Double> list = new ArrayList<Double>();
+			
+			for (Integer i : data) {
+				list.add(i.doubleValue());
+			}
+			return list;
+		}
+	}
+	
+	private static final class SumInitializer implements BroadcastVariableInitializer<Integer, Integer> {
+		@Override
+		public Integer initializeBroadcastVariable(Iterable<Integer> data) {
+			int sum = 0;
+			
+			for (Integer i : data) {
+				sum += i;
+			}
+			return sum;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index 391ef7f..5a2ab4c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -18,24 +18,11 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.DoubleCounter;
-import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Value;
 
 
 public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {
@@ -63,115 +50,10 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
 	public void setRuntimeContext(RuntimeContext t) {
 		super.setRuntimeContext(t);
 		
-		if (t instanceof IterationRuntimeContext) {
-			FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, new WrappingIterationRuntimeContext(t));
-		}
-		else{
-			FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, new WrappingRuntimeContext(t));
-		}
+		FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
 	}
 
 	public T getWrappedFunction () {
 		return this.wrappedFunction;
 	}
-	
-	
-	private static class WrappingRuntimeContext implements RuntimeContext {
-
-		protected final RuntimeContext context;
-		
-		
-		protected WrappingRuntimeContext(RuntimeContext context) {
-			this.context = context;
-		}
-
-		@Override
-		public String getTaskName() {
-			return context.getTaskName();
-		}
-
-		@Override
-		public int getNumberOfParallelSubtasks() {
-			return context.getNumberOfParallelSubtasks();
-		}
-
-		@Override
-		public int getIndexOfThisSubtask() {
-			return context.getIndexOfThisSubtask();
-		}
-
-
-		@Override
-		public <V, A> void addAccumulator(String name, Accumulator<V, A> accumulator) {
-			context.<V, A>addAccumulator(name, accumulator);
-		}
-
-		@Override
-		public <V, A> Accumulator<V, A> getAccumulator(String name) {
-			return context.<V, A>getAccumulator(name);
-		}
-
-		@Override
-		public HashMap<String, Accumulator<?, ?>> getAllAccumulators() {
-			return context.getAllAccumulators();
-		}
-
-		@Override
-		public IntCounter getIntCounter(String name) {
-			return context.getIntCounter(name);
-		}
-
-		@Override
-		public LongCounter getLongCounter(String name) {
-			return context.getLongCounter(name);
-		}
-
-		@Override
-		public DoubleCounter getDoubleCounter(String name) {
-			return context.getDoubleCounter(name);
-		}
-
-		@Override
-		public Histogram getHistogram(String name) {
-			return context.getHistogram(name);
-		}
-
-		@Override
-		public <RT> List<RT> getBroadcastVariable(String name) {
-			List<RT> refColl = context.getBroadcastVariable(name);
-			return new ArrayList<RT>(refColl);
-		}
-
-		@Override
-		public DistributedCache getDistributedCache() {
-			return context.getDistributedCache();
-		}
-		
-		@Override
-		public ClassLoader getUserCodeClassLoader() {
-			return context.getUserCodeClassLoader();
-		}
-	}
-	
-	private static class WrappingIterationRuntimeContext extends WrappingRuntimeContext implements IterationRuntimeContext {
-
-		protected WrappingIterationRuntimeContext(RuntimeContext context) {
-			super(context);
-		}
-
-		@Override
-		public int getSuperstepNumber() {
-			return ((IterationRuntimeContext) context).getSuperstepNumber();
-		}
-
-		@Override
-		public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-			return ((IterationRuntimeContext) context).<T>getIterationAggregator(name);
-		}
-
-		@Override
-		public <T extends Value> T getPreviousIterationAggregate(String name) {
-			return ((IterationRuntimeContext) context).<T>getPreviousIterationAggregate(name);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java
new file mode 100644
index 0000000..cfa87ae
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.broadcast;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+public class BroadcastVariableKey {
+
+	private final JobVertexID vertexId;
+	
+	private final String name;
+	
+	private final int superstep;
+
+	public BroadcastVariableKey(JobVertexID vertexId, String name, int superstep) {
+		if (vertexId == null || name == null || superstep <= 0) {
+			throw new IllegalArgumentException();
+		}
+		
+		this.vertexId = vertexId;
+		this.name = name;
+		this.superstep = superstep;
+	}
+
+	// ---------------------------------------------------------------------------------------------
+	
+	public JobVertexID getVertexId() {
+		return vertexId;
+	}
+	
+	public String getName() {
+		return name;
+	}
+	
+	public int getSuperstep() {
+		return superstep;
+	}
+	
+	// ---------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return 31 * superstep +
+				47 * name.hashCode() +
+				83 * vertexId.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj.getClass() == BroadcastVariableKey.class) {
+			BroadcastVariableKey other = (BroadcastVariableKey) obj;
+			return this.superstep == other.superstep &&
+					this.name.equals(other.name) &&
+					this.vertexId.equals(other.vertexId);
+		}
+		else {
+			return false;
+		}
+	}
+	
+	@Override
+	public String toString() {
+		return vertexId + " \"" + name + "\" (" + superstep + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
new file mode 100644
index 0000000..3cf9722
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.broadcast;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.runtime.io.network.api.MutableReader;
+import org.apache.flink.runtime.operators.RegularPactTask;
+
+public class BroadcastVariableManager {
+	
+	/** The singleton instance that keeps track of the shared broadcast variables. */
+	public static final BroadcastVariableManager INSTANCE = new BroadcastVariableManager();
+	
+	/** Prevent external instantiation. */
+	private BroadcastVariableManager() {}
+	
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private final ConcurrentHashMap<BroadcastVariableKey, BroadcastVariableMaterialization<?>> variables =
+							new ConcurrentHashMap<BroadcastVariableKey, BroadcastVariableMaterialization<?>>(16);
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public <T> List<T> getBroadcastVariable(String name, int superstep, RegularPactTask<?, ?> holder, 
+			MutableReader<?> reader, TypeSerializerFactory<T> serializerFactory) throws IOException
+	{
+		final BroadcastVariableKey key = new BroadcastVariableKey(holder.getEnvironment().getJobVertexId(), name, superstep);
+		
+		while (true) {
+			final BroadcastVariableMaterialization<T> newMat = new BroadcastVariableMaterialization<T>(key);
+			
+			final BroadcastVariableMaterialization<?> previous = variables.putIfAbsent(key, newMat);
+			
+			@SuppressWarnings("unchecked")
+			final BroadcastVariableMaterialization<T> materialization = (previous == null) ? newMat : (BroadcastVariableMaterialization<T>) previous;
+			
+			try {
+				return materialization.getMaterializedVariable(reader, serializerFactory, holder);
+			}
+			catch (MaterializationExpiredException e) {
+				// concurrent release. as an optimization, try to replace the previous one with our version. otherwise we might spin for a while
+				// until the releaser removes the variable
+				// NOTE: This would also catch a bug prevented an expired materialization from ever being removed, so it acts as a future safeguard
+				if (variables.replace(key, previous, newMat)) {
+					try {
+						return newMat.getMaterializedVariable(reader, serializerFactory, holder);
+					}
+					catch (MaterializationExpiredException ee) {
+						// can still happen in cases of extreme races and fast tasks
+						// fall through the loop;
+					}
+				}
+				// else fall through the loop
+			}
+		}
+	}
+	
+	
+	public void releaseReference(String name, int superstep, RegularPactTask<?, ?> referenceHolder) {
+		BroadcastVariableKey key = new BroadcastVariableKey(referenceHolder.getEnvironment().getJobVertexId(), name, superstep);
+		releaseReference(key, referenceHolder);
+	}
+	
+	public void releaseReference(BroadcastVariableKey key, RegularPactTask<?, ?> referenceHolder) {
+		BroadcastVariableMaterialization<?> mat = variables.get(key);
+		
+		// release this reference
+		if (mat.decrementReference(referenceHolder)) {
+			// remove if no one holds a reference and no one concurrently replaced the entry
+			variables.remove(key, mat);
+		}
+	}
+	
+	
+	public void releaseAllReferencesFromTask(RegularPactTask<?, ?> referenceHolder) {
+		// go through all registered variables 
+		for (Map.Entry<BroadcastVariableKey, BroadcastVariableMaterialization<?>> entry : variables.entrySet()) {
+			BroadcastVariableMaterialization<?> mat = entry.getValue();
+			
+			// release the reference
+			if (mat.decrementReferenceIfHeld(referenceHolder)) {
+				// remove if no one holds a reference and no one concurrently replaced the entry
+				variables.remove(entry.getKey(), mat);
+			}
+		}
+	}
+	
+	public int getNumberOfVariablesWithReferences() {
+		return this.variables.size();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
new file mode 100644
index 0000000..fce29de
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.broadcast;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.runtime.io.network.api.MutableReader;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.util.ReaderIterator;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * @param <T> The type of the elements in the broadcasted data set.
+ */
+public class BroadcastVariableMaterialization<T> {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(BroadcastVariableMaterialization.class);
+	
+	
+	private final Set<RegularPactTask<?, ?>> references = new HashSet<RegularPactTask<?,?>>();
+	
+	private final Object materializationMonitor = new Object();
+	
+	private final BroadcastVariableKey key;
+	
+	private ArrayList<T> data;
+	
+	private boolean materialized;
+	
+	private boolean disposed;
+	
+	
+	public BroadcastVariableMaterialization(BroadcastVariableKey key) {
+		this.key = key;
+	}
+	
+
+	public List<T> getMaterializedVariable(MutableReader<?> reader, TypeSerializerFactory<?> serializerFactory, RegularPactTask<?, ?> referenceHolder)
+			throws MaterializationExpiredException, IOException
+	{
+		Preconditions.checkNotNull(reader);
+		Preconditions.checkNotNull(serializerFactory);
+		Preconditions.checkNotNull(referenceHolder);
+		
+		final boolean materializer;
+		
+		// hold the reference lock only while we track references and decide who should be the materializer
+		// that way, other tasks can de-register (in case of failure) while materialization is happening
+		synchronized (references) {
+			if (disposed) {
+				throw new MaterializationExpiredException();
+			}
+			
+			// sanity check
+			if (!references.add(referenceHolder)) {
+				throw new IllegalStateException(
+						String.format("The task %s (%d/%d) already holds a reference to the broadcast variable %s.",
+								referenceHolder.getEnvironment().getTaskName(),
+								referenceHolder.getEnvironment().getIndexInSubtaskGroup() + 1,
+								referenceHolder.getEnvironment().getCurrentNumberOfSubtasks(),
+								key.toString()));
+			}
+			
+			materializer = references.size() == 1;
+		}
+
+		try {
+			@SuppressWarnings("unchecked")
+			final MutableReader<DeserializationDelegate<T>> typedReader = (MutableReader<DeserializationDelegate<T>>) reader;
+			@SuppressWarnings("unchecked")
+			final TypeSerializer<T> serializer = ((TypeSerializerFactory<T>) serializerFactory).getSerializer();
+			
+			final ReaderIterator<T> readerIterator = new ReaderIterator<T>(typedReader, serializer);
+			
+			if (materializer) {
+				// first one, so we need to materialize;
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Getting Broadcast Variable (" + key + ") - First access, materializing.");
+				}
+				
+				ArrayList<T> data = new ArrayList<T>();
+				
+				T element;
+				while ((element = readerIterator.next(serializer.createInstance())) != null) {
+					data.add(element);
+				}
+				
+				synchronized (materializationMonitor) {
+					this.data = data;
+					this.materialized = true;
+					materializationMonitor.notifyAll();
+				}
+				
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Materialization of Broadcast Variable (" + key + ") finished.");
+				}
+			}
+			else {
+				// successor: discard all data and refer to the shared variable
+				
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Getting Broadcast Variable (" + key + ") - shared access.");
+				}
+				
+				T element = serializer.createInstance();
+				while ((element = readerIterator.next(element)) != null);
+				
+				synchronized (materializationMonitor) {
+					while (!this.materialized) {
+						materializationMonitor.wait();
+					}
+				}
+				
+			}
+			
+			return this.data;
+		}
+		catch (Throwable t) {
+			// in case of an exception, we need to clean up big time
+			decrementReferenceIfHeld(referenceHolder);
+			
+			if (t instanceof IOException) {
+				throw (IOException) t;
+			} else {
+				throw new IOException("Materialization of the broadcast variable failed.", t);
+			}
+		}
+	}
+	
+	public boolean decrementReference(RegularPactTask<?, ?> referenceHolder) {
+		return decrementReferenceInternal(referenceHolder, true);
+	}
+	
+	public boolean decrementReferenceIfHeld(RegularPactTask<?, ?> referenceHolder) {
+		return decrementReferenceInternal(referenceHolder, false);
+	}
+	
+	private boolean decrementReferenceInternal(RegularPactTask<?, ?> referenceHolder, boolean errorIfNoReference) {
+		synchronized (references) {
+			if (disposed || references.isEmpty()) {
+				if (errorIfNoReference) {
+					throw new IllegalStateException("Decrementing reference to broadcast variable that is no longer alive.");
+				} else {
+					return false;
+				}
+			}
+			
+			if (!references.remove(referenceHolder)) {
+				if (errorIfNoReference) {
+					throw new IllegalStateException(
+							String.format("The task %s (%d/%d) did not hold a reference to the broadcast variable %s.",
+									referenceHolder.getEnvironment().getTaskName(),
+									referenceHolder.getEnvironment().getIndexInSubtaskGroup() + 1,
+									referenceHolder.getEnvironment().getCurrentNumberOfSubtasks(),
+									key.toString()));
+				} else {
+					return false;
+				}
+			}
+			
+			
+			if (references.isEmpty()) {
+				disposed = true;
+				data = null;
+				return true;
+			} else {
+				return false;
+			}
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@SuppressWarnings("unused")
+	private static final class NewObjectCreatingIterator<T> implements Iterator<T> {
+		
+		private final TypeSerializer<T> serializer;
+		
+		private MutableObjectIterator<T> iterator;
+		
+		private T next;
+		
+		
+		public NewObjectCreatingIterator(TypeSerializer<T> serializer, MutableObjectIterator<T> iterator) {
+			this.serializer = serializer;
+			this.iterator = iterator;
+		}
+
+		
+		@Override
+		public boolean hasNext() {
+			if (next != null) {
+				return true;
+			}
+			else if (iterator != null) {
+				try {
+					next = iterator.next(serializer.createInstance());
+					if (next != null) {
+						return true;
+					} else {
+						iterator = null;
+						return false;
+					}
+				} catch (IOException e) {
+					throw new RuntimeException("Error getting element for broadcast variable.", e);
+				}
+			}
+			else {
+				return false;
+			}
+		}
+		
+		@Override
+		public T next() {
+			if (hasNext()) {
+				return next;
+			} else {
+				throw new NoSuchElementException();
+			}
+		}
+		
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java
new file mode 100644
index 0000000..f18c431
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.broadcast;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+
+public class DefaultBroadcastVariableInitializer<T> implements BroadcastVariableInitializer<T, List<T>> {
+
+	@Override
+	public List<T> initializeBroadcastVariable(Iterable<T> data) {
+		ArrayList<T> list = new ArrayList<T>();
+		
+		for (T value : data) {
+			list.add(value);
+		}
+		return list;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final DefaultBroadcastVariableInitializer<Object> INSTANCE = new DefaultBroadcastVariableInitializer<Object>();
+	
+	@SuppressWarnings("unchecked")
+	public static <E> DefaultBroadcastVariableInitializer<E> instance() {
+		return (DefaultBroadcastVariableInitializer<E>) INSTANCE;
+	}
+	
+	private DefaultBroadcastVariableInitializer() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java
new file mode 100644
index 0000000..45f4f47
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.broadcast;
+
+public class MaterializationExpiredException extends Exception {
+	private static final long serialVersionUID = 7476456353634121934L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 7e692c3..63c6135 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.gates.InputGate;
 import org.apache.flink.runtime.io.network.gates.OutputGate;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.protocols.AccumulatorProtocol;
@@ -44,6 +45,7 @@ import org.apache.flink.runtime.protocols.AccumulatorProtocol;
  * splits, memory manager, etc.
  */
 public interface Environment {
+	
 	/**
 	 * Returns the ID of the job from the original job graph. It is used by the library cache manager to find the
 	 * required
@@ -52,6 +54,13 @@ public interface Environment {
 	 * @return the ID of the job from the original job graph
 	 */
 	JobID getJobID();
+	
+	/**
+	 * Gets the ID of the jobVertex that this task corresponds to.
+	 * 
+	 * @return The JobVertexID of this task.
+	 */
+	JobVertexID getJobVertexId();
 
 	/**
 	 * Returns the task configuration object which was attached to the original JobVertex.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index a7749cd..dc282b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.io.network.gates.GateID;
 import org.apache.flink.runtime.io.network.gates.InputGate;
 import org.apache.flink.runtime.io.network.gates.OutputGate;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -71,8 +72,6 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	/** The interval to sleep in case a communication channel is not yet entirely set up (in milliseconds). */
 	private static final int SLEEPINTERVAL = 100;
 	
-	
-	
 	// --------------------------------------------------------------------------------------------
 
 	/** The task that owns this environment */
@@ -208,6 +207,11 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	public JobID getJobID() {
 		return this.owner.getJobID();
 	}
+	
+	@Override
+	public JobVertexID getJobVertexId() {
+		return this.owner.getVertexID();
+	}
 
 	@Override
 	public GateID getNextUnboundInputGateID() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index d400908..65afc6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -130,12 +130,18 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
 			// re-read the iterative broadcast variables
 			for (int i : this.iterativeBroadcastInputs) {
 				final String name = getTaskConfig().getBroadcastInputName(i);
-				readAndSetBroadcastInput(i, name, this.runtimeUdfContext);
+				readAndSetBroadcastInput(i, name, this.runtimeUdfContext, superstepNum);
 			}
 		}
 
 		// call the parent to execute the superstep
 		super.run();
+		
+		// release the iterative broadcast variables
+		for (int i : this.iterativeBroadcastInputs) {
+			final String name = getTaskConfig().getBroadcastInputName(i);
+			releaseBroadcastVariables(name, superstepNum, this.runtimeUdfContext);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
index e466e75..f7db9d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBro
 import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
 import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
-import org.apache.flink.runtime.operators.PactTaskContext;
 import org.apache.flink.util.Collector;
 
 /**
@@ -39,8 +38,7 @@ import org.apache.flink.util.Collector;
  * <p/>
  * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
  */
-public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT>
-		implements PactTaskContext<S, OT> {
+public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
 
 	private static final Logger log = LoggerFactory.getLogger(IterationTailPactTask.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 35908a9..5cb406b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import org.slf4j.Logger;
@@ -30,11 +29,11 @@ import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.accumulators.AccumulatorEvent;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -80,7 +79,7 @@ import java.util.Map;
 public class RegularPactTask<S extends Function, OT> extends AbstractInvokable implements PactTaskContext<S, OT> {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(RegularPactTask.class);
-
+	
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -127,13 +126,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	protected MutableObjectIterator<?>[] inputIterators;
 
 	/**
-	 * The input readers for the configured broadcast variables, wrapped in an iterator. 
-	 * Prior to the local strategies, etc...
+	 * The indices of the iterative inputs. Empty, if the task is not iterative. 
 	 */
-	protected MutableObjectIterator<?>[] broadcastInputIterators;
-	
 	protected int[] iterativeInputs;
 	
+	/**
+	 * The indices of the iterative broadcast inputs. Empty, if non of the inputs is iteratve. 
+	 */
 	protected int[] iterativeBroadcastInputs;
 	
 	/**
@@ -210,6 +209,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 */
 	protected volatile boolean running = true;
 
+	
 	// --------------------------------------------------------------------------------------------
 	//                                  Task Interface
 	// --------------------------------------------------------------------------------------------
@@ -334,7 +334,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 				}
 				
 				initLocalStrategies(numInputs);
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
 				throw new RuntimeException("Initializing the input processing failed" +
 					e.getMessage() == null ? "." : ": " + e.getMessage(), e);
 			}
@@ -349,10 +350,10 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			// pre main-function initialization
 			initialize();
 
-			// read the broadcast variables
+			// read the broadcast variables. they will be released in the finally clause 
 			for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) {
 				final String name = this.config.getBroadcastInputName(i);
-				readAndSetBroadcastInput(i, name, this.runtimeUdfContext);
+				readAndSetBroadcastInput(i, name, this.runtimeUdfContext, 1 /* superstep one for the start */);
 			}
 
 			// the work goes here
@@ -420,24 +421,33 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		}
 	}
 	
-	protected <X> void readAndSetBroadcastInput(int inputNum, String bcVarName, RuntimeUDFContext context) throws IOException {
-		// drain the broadcast inputs
-
-		@SuppressWarnings("unchecked")
-		final MutableObjectIterator<X> reader =  (MutableObjectIterator<X>) this.broadcastInputIterators[inputNum];
+	protected <X> void readAndSetBroadcastInput(int inputNum, String bcVarName, RuntimeUDFContext context, int superstep) throws IOException {
+		
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(formatLogString("Setting broadcast variable '" + bcVarName + "'" + 
+				(superstep > 1 ? ", superstep " + superstep : "")));
+		}
 		
 		@SuppressWarnings("unchecked")
-		final TypeSerializer<X> serializer =  (TypeSerializer<X>) this.broadcastInputSerializers[inputNum].getSerializer();
+		final TypeSerializerFactory<X> serializerFactory =  (TypeSerializerFactory<X>) this.broadcastInputSerializers[inputNum];
+		
+		final MutableReader<?> reader = this.broadcastInputReaders[inputNum];
 
-		ArrayList<X> collection = new ArrayList<X>();
+		List<X> variable = BroadcastVariableManager.INSTANCE.getBroadcastVariable(bcVarName, superstep, this, reader, serializerFactory);
 		
-		X record = serializer.createInstance();
-		while (this.running && ((record = reader.next(record)) != null)) {
-			collection.add(record);
-			record = serializer.createInstance();
+		context.setBroadcastVariable(bcVarName, variable);
+	}
+	
+	protected void releaseBroadcastVariables(String bcVarName, int superstep, RuntimeUDFContext context) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(formatLogString("Releasing broadcast variable '" + bcVarName + "'" + 
+				(superstep > 1 ? ", superstep " + superstep : "")));
 		}
-		context.setBroadcastVariable(bcVarName, collection);
+		
+		BroadcastVariableManager.INSTANCE.releaseReference(bcVarName, superstep, this);
+		context.clearBroadcastVariable(bcVarName);
 	}
+	
 
 	protected void run() throws Exception {
 		// ---------------------------- Now, the actual processing starts ------------------------
@@ -515,7 +525,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 				catch (Throwable t) {}
 			}
 			
-			// if resettable driver invoke treardown
+			// if resettable driver invoke teardown
 			if (this.driver instanceof ResettablePactDriver) {
 				final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
 				try {
@@ -594,6 +604,19 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	}
 
 	protected void closeLocalStrategiesAndCaches() {
+		
+		// make sure that all broadcast variable references held by this task are released
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(formatLogString("Releasing all broadcast variables."));
+		}
+		
+		BroadcastVariableManager.INSTANCE.releaseAllReferencesFromTask(this);
+		if (runtimeUdfContext != null) {
+			runtimeUdfContext.clearAllBroadcastVariables();
+		}
+		
+		// clean all local strategies and caches/pipeline breakers. 
+		
 		if (this.localStrategies != null) {
 			for (int i = 0; i < this.localStrategies.length; i++) {
 				if (this.localStrategies[i] != null) {
@@ -786,7 +809,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 */
 	protected void initBroadcastInputsSerializers(int numBroadcastInputs) throws Exception {
 		this.broadcastInputSerializers = new TypeSerializerFactory[numBroadcastInputs];
-		this.broadcastInputIterators = new MutableObjectIterator[numBroadcastInputs];
 
 		ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 
@@ -794,8 +816,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			//  ---------------- create the serializer first ---------------------
 			final TypeSerializerFactory<?> serializerFactory = this.config.getBroadcastInputSerializer(i, userCodeClassLoader);
 			this.broadcastInputSerializers[i] = serializerFactory;
-
-			this.broadcastInputIterators[i] = createInputIterator(this.broadcastInputReaders[i], this.broadcastInputSerializers[i]);
 		}
 	}
 
@@ -1493,6 +1513,4 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		}
 		return a;
 	}
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index ee1a5b7..bd32f10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.io.network.serialization.AdaptiveSpanningRecordD
 import org.apache.flink.runtime.io.network.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.serialization.RecordDeserializer.DeserializationResult;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -345,4 +346,9 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 	public Map<String, FutureTask<Path>> getCopyTask() {
 		return null;
 	}
+
+	@Override
+	public JobVertexID getJobVertexId() {
+		return new JobVertexID(new byte[16]);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
index 81950f2..08cded2 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
@@ -36,6 +36,7 @@ import org.apache.flink.util.Visitor;
 /**
  * Utility to get operator instances from plans via name.
  */
+@SuppressWarnings("deprecation")
 public class OperatorResolver implements Visitor<Operator<?>> {
 	
 	private final Map<String, List<Operator<?>>> map;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 105943c..8460382 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -40,6 +40,7 @@ import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.client.minicluster.NepheleMiniCluster;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.Assert;
 
@@ -101,6 +102,9 @@ public abstract class AbstractTestBase {
 		} finally {
 			deleteAllTempFiles();
 		}
+		
+		Assert.assertEquals("Not all broadcast variables were released.",
+				0, BroadcastVariableManager.INSTANCE.getNumberOfVariablesWithReferences());
 	}
 
 	//------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
index 944d999..a274432 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
@@ -19,7 +19,9 @@
 package org.apache.flink.test.broadcastvars;
 
 import java.io.BufferedReader;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -198,7 +200,13 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 
 		@Override
 		public void open(Configuration parameters) throws Exception {
-			this.models = this.getRuntimeContext().getBroadcastVariable("models");
+			List<Record> shared = this.getRuntimeContext().getBroadcastVariable("models");
+			this.models = new ArrayList<Record>(shared.size());
+			synchronized (shared) {
+				for (Record r : shared) {
+					this.models.add(r.createCopy());
+				}
+			}
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 8227f97..afa38b5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -76,6 +76,7 @@ import org.junit.runners.Parameterized.Parameters;
  * - intermediate workset update and solution set tail
  * - intermediate solution set update and workset tail
  */
+@SuppressWarnings("deprecation")
 @RunWith(Parameterized.class)
 public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
index 61c5767..e5f519d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.recordJobs.kmeans;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -47,7 +46,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
-
+@SuppressWarnings("deprecation")
 public class KMeansBroadcast implements Program, ProgramDescription {
 	
 	private static final long serialVersionUID = 1L;
@@ -197,11 +196,13 @@ public class KMeansBroadcast implements Program, ProgramDescription {
 		 */
 		@Override
 		public void open(Configuration parameters) throws Exception {
-			Collection<Record> clusterCenters = this.getRuntimeContext().getBroadcastVariable("centers");
+			List<Record> clusterCenters = this.getRuntimeContext().getBroadcastVariable("centers");
 			
 			centers.clear();
-			for (Record r : clusterCenters) {
-				centers.add(new PointWithId(r.getField(0, IntValue.class).getValue(), r.getField(1, Point.class)));
+			synchronized (clusterCenters) {
+				for (Record r : clusterCenters) {
+					centers.add(new PointWithId(r.getField(0, IntValue.class).getValue(), r.getField(1, Point.class)));
+				}
 			}
 		}