You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/04 11:06:38 UTC
[4/6] git commit: [FLINK-1198] Broadcast variables are shared between
tasks in the same TaskManager
[FLINK-1198] Broadcast variables are shared between tasks in the same TaskManager
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cc766533
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cc766533
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cc766533
Branch: refs/heads/master
Commit: cc766533e09c8b59cfb951c86c2e3989720fbeb9
Parents: bbd8209
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 27 13:23:02 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 20:58:49 2014 +0100
----------------------------------------------------------------------
.../functions/BroadcastVariableInitializer.java | 24 ++
.../api/common/functions/RuntimeContext.java | 28 +-
.../functions/util/RuntimeUDFContext.java | 117 ++++++---
.../java/org/apache/flink/util/NetUtils.java | 12 +-
.../flink/util/TraversableOnceIterable.java | 42 +++
.../functions/util/RuntimeUDFContextTest.java | 173 +++++++++++++
.../operators/translation/WrappingFunction.java | 120 +--------
.../runtime/broadcast/BroadcastVariableKey.java | 81 ++++++
.../broadcast/BroadcastVariableManager.java | 113 ++++++++
.../BroadcastVariableMaterialization.java | 256 +++++++++++++++++++
.../DefaultBroadcastVariableInitializer.java | 48 ++++
.../MaterializationExpiredException.java | 23 ++
.../flink/runtime/execution/Environment.java | 9 +
.../runtime/execution/RuntimeEnvironment.java | 8 +-
.../task/AbstractIterativePactTask.java | 8 +-
.../iterative/task/IterationTailPactTask.java | 4 +-
.../runtime/operators/RegularPactTask.java | 74 ++++--
.../operators/testutils/MockEnvironment.java | 6 +
.../test/compiler/util/OperatorResolver.java | 1 +
.../flink/test/util/AbstractTestBase.java | 4 +
.../BroadcastVarsNepheleITCase.java | 10 +-
.../ConnectedComponentsNepheleITCase.java | 1 +
.../test/recordJobs/kmeans/KMeansBroadcast.java | 11 +-
23 files changed, 977 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
new file mode 100644
index 0000000..7fc81e5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+public interface BroadcastVariableInitializer<T, O> {
+
+ O initializeBroadcastVariable(Iterable<T> data);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 5f6aaa5..7459a5a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -119,11 +119,37 @@ public interface RuntimeContext {
/**
* Returns the result bound to the broadcast variable identified by the
* given {@code name}.
+ * <p>
+ * IMPORTANT: The broadcast variable data structure is shared between the parallel
+ * tasks on one machine. Any access that modifies its internal state needs to
+ * be manually synchronized by the caller.
+ *
+ * @param name The name under which the broadcast variable is registered;
+ * @return The broadcast variable, materialized as a list of elements.
*/
<RT> List<RT> getBroadcastVariable(String name);
+
+ /**
+ * Returns the result bound to the broadcast variable identified by the
+ * given {@code name}. The broadcast variable is returned as a shared data structure
+ * that is initialized with the given {@link BroadcastVariableInitializer}.
+ * <p>
+ * IMPORTANT: The broadcast variable data structure is shared between the parallel
+ * tasks on one machine. Any access that modifies its internal state needs to
+ * be manually synchronized by the caller.
+ *
+ * @param name The name under which the broadcast variable is registered;
+ * @param initializer The initializer that creates the shared data structure of the broadcast
+ * variable from the sequence of elements.
+ * @return The broadcast variable, materialized as a list of elements.
+ */
+ <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer);
/**
- * Returns the distributed cache to get the local tmp file.
+ * Returns the {@link DistributedCache} to get the local temporary file copies of files otherwise not
+ * locally accessible.
+ *
+ * @return The distributed cache of the worker executing this instance.
*/
DistributedCache getDistributedCache();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index c3d9076..95718f5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -30,11 +30,12 @@ import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.core.fs.Path;
/**
- * Implementation of the {@link RuntimeContext}, created by runtime UDF operators.
+ * A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators.
*/
public class RuntimeUDFContext implements RuntimeContext {
@@ -45,20 +46,23 @@ public class RuntimeUDFContext implements RuntimeContext {
private final int subtaskIndex;
private final ClassLoader userCodeClassLoader;
-
- private final DistributedCache distributedCache = new DistributedCache();
private final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
+
+ private final DistributedCache distributedCache = new DistributedCache();
- private final HashMap<String, List<?>> broadcastVars = new HashMap<String, List<?>>();
-
+ private final HashMap<String, Object> initializedBroadcastVars = new HashMap<String, Object>();
+
+ private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>();
+
+
public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) {
this.name = name;
this.numParallelSubtasks = numParallelSubtasks;
this.subtaskIndex = subtaskIndex;
this.userCodeClassLoader = userCodeClassLoader;
}
-
+
public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, Map<String, FutureTask<Path>> cpTasks) {
this.name = name;
this.numParallelSubtasks = numParallelSubtasks;
@@ -66,6 +70,7 @@ public class RuntimeUDFContext implements RuntimeContext {
this.userCodeClassLoader = userCodeClassLoader;
this.distributedCache.setCopyTasks(cpTasks);
}
+
@Override
public String getTaskName() {
@@ -117,6 +122,72 @@ public class RuntimeUDFContext implements RuntimeContext {
return (Accumulator<V, A>) accumulators.get(name);
}
+ @Override
+ public HashMap<String, Accumulator<?, ?>> getAllAccumulators() {
+ return this.accumulators;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <RT> List<RT> getBroadcastVariable(String name) {
+
+ // check if we have an initialized version
+ Object o = this.initializedBroadcastVars.get(name);
+ if (o != null) {
+ if (o instanceof List) {
+ return (List<RT>) o;
+ }
+ else {
+ throw new IllegalStateException("The broadcast variable with name '" + name +
+ "' is not a List. A different call must have requested this variable with a BroadcastVariableInitializer.");
+ }
+ }
+ else {
+ List<?> uninitialized = this.uninitializedBroadcastVars.remove(name);
+ if (uninitialized != null) {
+ this.initializedBroadcastVars.put(name, uninitialized);
+ return (List<RT>) uninitialized;
+ }
+ else {
+ throw new IllegalArgumentException("The broadcast variable with name '" + name + "' has not been set.");
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
+
+ // check if we have an initialized version
+ Object o = this.initializedBroadcastVars.get(name);
+ if (o != null) {
+ return (C) o;
+ }
+ else {
+ List<T> uninitialized = (List<T>) this.uninitializedBroadcastVars.remove(name);
+ if (uninitialized != null) {
+ C result = initializer.initializeBroadcastVariable(uninitialized);
+ this.initializedBroadcastVars.put(name, result);
+ return result;
+ }
+ else {
+ throw new IllegalArgumentException("The broadcast variable with name '" + name + "' has not been set.");
+ }
+ }
+ }
+
+ @Override
+ public DistributedCache getDistributedCache() {
+ return this.distributedCache;
+ }
+
+ @Override
+ public ClassLoader getUserCodeClassLoader() {
+ return this.userCodeClassLoader;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
@SuppressWarnings("unchecked")
private <V, A> Accumulator<V, A> getAccumulator(String name,
Class<? extends Accumulator<V, A>> accumulatorClass) {
@@ -138,34 +209,18 @@ public class RuntimeUDFContext implements RuntimeContext {
}
return (Accumulator<V, A>) accumulator;
}
-
- @Override
- public HashMap<String, Accumulator<?, ?>> getAllAccumulators() {
- return this.accumulators;
- }
-
+
public void setBroadcastVariable(String name, List<?> value) {
- this.broadcastVars.put(name, value);
- }
-
-
- @Override
- @SuppressWarnings("unchecked")
- public <RT> List<RT> getBroadcastVariable(String name) {
- if (!this.broadcastVars.containsKey(name)) {
- throw new IllegalArgumentException("Trying to access an unbound broadcast variable '"
- + name + "'.");
- }
- return (List<RT>) this.broadcastVars.get(name);
+ this.uninitializedBroadcastVars.put(name, value);
}
-
- @Override
- public DistributedCache getDistributedCache() {
- return this.distributedCache;
+
+ public void clearBroadcastVariable(String name) {
+ this.uninitializedBroadcastVars.remove(name);
+ this.initializedBroadcastVars.remove(name);
}
- @Override
- public ClassLoader getUserCodeClassLoader() {
- return this.userCodeClassLoader;
+ public void clearAllBroadcastVariables() {
+ this.uninitializedBroadcastVars.clear();
+ this.initializedBroadcastVars.clear();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index d2fcab8..c4538ec 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -19,15 +19,17 @@ package org.apache.flink.util;
public class NetUtils {
+
/**
- * Turn a fully qualified domain name (fqdn) into a hostname.
+ * Turn a fully qualified domain name (fqdn) into a hostname. If the fqdn has multiple subparts
+ * (separated by a period '.'), it will take the first part. Otherwise it takes the entire fqdn.
*
- * @param fqdn
- * @return
+ * @param fqdn The fully qualified domain name.
+ * @return The hostname.
*/
public static String getHostnameFromFQDN(String fqdn) {
- if(fqdn == null) {
- throw new IllegalArgumentException("Input string is null (fqdn)");
+ if (fqdn == null) {
+ throw new IllegalArgumentException("fqdn is null");
}
int dotPos = fqdn.indexOf('.');
if(dotPos == -1) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java
new file mode 100644
index 0000000..73e3cd6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceIterable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.Iterator;
+
+public class TraversableOnceIterable<T> implements Iterable<T> {
+
+ private final Iterator<T> iterator;
+
+ public TraversableOnceIterable(Iterator<T> iterator) {
+ if (iterator == null) {
+ throw new NullPointerException("The iterator must not be null.");
+ }
+ this.iterator = iterator;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ if (iterator != null) {
+ return iterator;
+ } else {
+ throw new TraversableOnceException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
new file mode 100644
index 0000000..671c3fd
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.junit.Test;
+
+
+public class RuntimeUDFContextTest {
+
+ @Test
+ public void testBroadcastVariableNotFound() {
+ try {
+ RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader());
+
+ try {
+ ctx.getBroadcastVariable("some name");
+ fail("should throw an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ ctx.getBroadcastVariableWithInitializer("some name", new BroadcastVariableInitializer<Object, Object>() {
+ public Object initializeBroadcastVariable(Iterable<Object> data) { return null; }
+ });
+
+ fail("should throw an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBroadcastVariableSimple() {
+ try {
+ RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader());
+
+ ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
+ ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+ List<Integer> list1 = ctx.getBroadcastVariable("name1");
+ List<Double> list2 = ctx.getBroadcastVariable("name2");
+
+ assertEquals(Arrays.asList(1, 2, 3, 4), list1);
+ assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list2);
+
+ // access again
+ List<Integer> list3 = ctx.getBroadcastVariable("name1");
+ List<Double> list4 = ctx.getBroadcastVariable("name2");
+
+ assertEquals(Arrays.asList(1, 2, 3, 4), list3);
+ assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list4);
+
+ // and again ;-)
+ List<Integer> list5 = ctx.getBroadcastVariable("name1");
+ List<Double> list6 = ctx.getBroadcastVariable("name2");
+
+ assertEquals(Arrays.asList(1, 2, 3, 4), list5);
+ assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list6);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBroadcastVariableWithInitializer() {
+ try {
+ RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader());
+
+ ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
+
+ // access it the first time with an initializer
+ List<Double> list = ctx.getBroadcastVariableWithInitializer("name", new ConvertingInitializer());
+ assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list);
+
+ // access it the second time with an initializer (which might not get executed)
+ List<Double> list2 = ctx.getBroadcastVariableWithInitializer("name", new ConvertingInitializer());
+ assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list2);
+
+ // access it the third time without an initializer (should work by "chance", because the result is a list)
+ List<Double> list3 = ctx.getBroadcastVariable("name");
+ assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list3);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBroadcastVariableWithInitializerAndMismatch() {
+ try {
+ RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader());
+
+ ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
+
+ // access it the first time with an initializer
+ int sum = ctx.getBroadcastVariableWithInitializer("name", new SumInitializer());
+ assertEquals(10, sum);
+
+ // access it the second time with no initializer -> should fail due to type mismatch
+ try {
+ ctx.getBroadcastVariable("name");
+ fail("should throw an exception");
+ }
+ catch (IllegalStateException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final class ConvertingInitializer implements BroadcastVariableInitializer<Integer, List<Double>> {
+ @Override
+ public List<Double> initializeBroadcastVariable(Iterable<Integer> data) {
+ List<Double> list = new ArrayList<Double>();
+
+ for (Integer i : data) {
+ list.add(i.doubleValue());
+ }
+ return list;
+ }
+ }
+
+ private static final class SumInitializer implements BroadcastVariableInitializer<Integer, Integer> {
+ @Override
+ public Integer initializeBroadcastVariable(Iterable<Integer> data) {
+ int sum = 0;
+
+ for (Integer i : data) {
+ sum += i;
+ }
+ return sum;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index 391ef7f..5a2ab4c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -18,24 +18,11 @@
package org.apache.flink.api.java.operators.translation;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.DoubleCounter;
-import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Value;
public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {
@@ -63,115 +50,10 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
public void setRuntimeContext(RuntimeContext t) {
super.setRuntimeContext(t);
- if (t instanceof IterationRuntimeContext) {
- FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, new WrappingIterationRuntimeContext(t));
- }
- else{
- FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, new WrappingRuntimeContext(t));
- }
+ FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
}
public T getWrappedFunction () {
return this.wrappedFunction;
}
-
-
- private static class WrappingRuntimeContext implements RuntimeContext {
-
- protected final RuntimeContext context;
-
-
- protected WrappingRuntimeContext(RuntimeContext context) {
- this.context = context;
- }
-
- @Override
- public String getTaskName() {
- return context.getTaskName();
- }
-
- @Override
- public int getNumberOfParallelSubtasks() {
- return context.getNumberOfParallelSubtasks();
- }
-
- @Override
- public int getIndexOfThisSubtask() {
- return context.getIndexOfThisSubtask();
- }
-
-
- @Override
- public <V, A> void addAccumulator(String name, Accumulator<V, A> accumulator) {
- context.<V, A>addAccumulator(name, accumulator);
- }
-
- @Override
- public <V, A> Accumulator<V, A> getAccumulator(String name) {
- return context.<V, A>getAccumulator(name);
- }
-
- @Override
- public HashMap<String, Accumulator<?, ?>> getAllAccumulators() {
- return context.getAllAccumulators();
- }
-
- @Override
- public IntCounter getIntCounter(String name) {
- return context.getIntCounter(name);
- }
-
- @Override
- public LongCounter getLongCounter(String name) {
- return context.getLongCounter(name);
- }
-
- @Override
- public DoubleCounter getDoubleCounter(String name) {
- return context.getDoubleCounter(name);
- }
-
- @Override
- public Histogram getHistogram(String name) {
- return context.getHistogram(name);
- }
-
- @Override
- public <RT> List<RT> getBroadcastVariable(String name) {
- List<RT> refColl = context.getBroadcastVariable(name);
- return new ArrayList<RT>(refColl);
- }
-
- @Override
- public DistributedCache getDistributedCache() {
- return context.getDistributedCache();
- }
-
- @Override
- public ClassLoader getUserCodeClassLoader() {
- return context.getUserCodeClassLoader();
- }
- }
-
- private static class WrappingIterationRuntimeContext extends WrappingRuntimeContext implements IterationRuntimeContext {
-
- protected WrappingIterationRuntimeContext(RuntimeContext context) {
- super(context);
- }
-
- @Override
- public int getSuperstepNumber() {
- return ((IterationRuntimeContext) context).getSuperstepNumber();
- }
-
- @Override
- public <T extends Aggregator<?>> T getIterationAggregator(String name) {
- return ((IterationRuntimeContext) context).<T>getIterationAggregator(name);
- }
-
- @Override
- public <T extends Value> T getPreviousIterationAggregate(String name) {
- return ((IterationRuntimeContext) context).<T>getPreviousIterationAggregate(name);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java
new file mode 100644
index 0000000..cfa87ae
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.broadcast;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+public class BroadcastVariableKey {
+
+ private final JobVertexID vertexId;
+
+ private final String name;
+
+ private final int superstep;
+
+ public BroadcastVariableKey(JobVertexID vertexId, String name, int superstep) {
+ if (vertexId == null || name == null || superstep <= 0) {
+ throw new IllegalArgumentException();
+ }
+
+ this.vertexId = vertexId;
+ this.name = name;
+ this.superstep = superstep;
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ public JobVertexID getVertexId() {
+ return vertexId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getSuperstep() {
+ return superstep;
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return 31 * superstep +
+ 47 * name.hashCode() +
+ 83 * vertexId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj.getClass() == BroadcastVariableKey.class) {
+ BroadcastVariableKey other = (BroadcastVariableKey) obj;
+ return this.superstep == other.superstep &&
+ this.name.equals(other.name) &&
+ this.vertexId.equals(other.vertexId);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return vertexId + " \"" + name + "\" (" + superstep + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
new file mode 100644
index 0000000..3cf9722
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.broadcast;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.runtime.io.network.api.MutableReader;
+import org.apache.flink.runtime.operators.RegularPactTask;
+
+public class BroadcastVariableManager {
+
+ /** The singleton instance that keeps track of the shared broadcast variables. */
+ public static final BroadcastVariableManager INSTANCE = new BroadcastVariableManager();
+
+ /** Prevent external instantiation. */
+ private BroadcastVariableManager() {}
+
+
+ // --------------------------------------------------------------------------------------------
+
+ private final ConcurrentHashMap<BroadcastVariableKey, BroadcastVariableMaterialization<?>> variables =
+ new ConcurrentHashMap<BroadcastVariableKey, BroadcastVariableMaterialization<?>>(16);
+
+ // --------------------------------------------------------------------------------------------
+
+ public <T> List<T> getBroadcastVariable(String name, int superstep, RegularPactTask<?, ?> holder,
+ MutableReader<?> reader, TypeSerializerFactory<T> serializerFactory) throws IOException
+ {
+ final BroadcastVariableKey key = new BroadcastVariableKey(holder.getEnvironment().getJobVertexId(), name, superstep);
+
+ while (true) {
+ final BroadcastVariableMaterialization<T> newMat = new BroadcastVariableMaterialization<T>(key);
+
+ final BroadcastVariableMaterialization<?> previous = variables.putIfAbsent(key, newMat);
+
+ @SuppressWarnings("unchecked")
+ final BroadcastVariableMaterialization<T> materialization = (previous == null) ? newMat : (BroadcastVariableMaterialization<T>) previous;
+
+ try {
+ return materialization.getMaterializedVariable(reader, serializerFactory, holder);
+ }
+ catch (MaterializationExpiredException e) {
+ // concurrent release. as an optimization, try to replace the previous one with our version. otherwise we might spin for a while
+ // until the releaser removes the variable
+ // NOTE: This would also catch a bug prevented an expired materialization from ever being removed, so it acts as a future safeguard
+ if (variables.replace(key, previous, newMat)) {
+ try {
+ return newMat.getMaterializedVariable(reader, serializerFactory, holder);
+ }
+ catch (MaterializationExpiredException ee) {
+ // can still happen in cases of extreme races and fast tasks
+ // fall through the loop;
+ }
+ }
+ // else fall through the loop
+ }
+ }
+ }
+
+
+ public void releaseReference(String name, int superstep, RegularPactTask<?, ?> referenceHolder) {
+ BroadcastVariableKey key = new BroadcastVariableKey(referenceHolder.getEnvironment().getJobVertexId(), name, superstep);
+ releaseReference(key, referenceHolder);
+ }
+
+ public void releaseReference(BroadcastVariableKey key, RegularPactTask<?, ?> referenceHolder) {
+ BroadcastVariableMaterialization<?> mat = variables.get(key);
+
+ // release this reference
+ if (mat.decrementReference(referenceHolder)) {
+ // remove if no one holds a reference and no one concurrently replaced the entry
+ variables.remove(key, mat);
+ }
+ }
+
+
+ public void releaseAllReferencesFromTask(RegularPactTask<?, ?> referenceHolder) {
+ // go through all registered variables
+ for (Map.Entry<BroadcastVariableKey, BroadcastVariableMaterialization<?>> entry : variables.entrySet()) {
+ BroadcastVariableMaterialization<?> mat = entry.getValue();
+
+ // release the reference
+ if (mat.decrementReferenceIfHeld(referenceHolder)) {
+ // remove if no one holds a reference and no one concurrently replaced the entry
+ variables.remove(entry.getKey(), mat);
+ }
+ }
+ }
+
+ public int getNumberOfVariablesWithReferences() {
+ return this.variables.size();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
new file mode 100644
index 0000000..fce29de
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.broadcast;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.runtime.io.network.api.MutableReader;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.util.ReaderIterator;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * @param <T> The type of the elements in the broadcasted data set.
+ */
+public class BroadcastVariableMaterialization<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BroadcastVariableMaterialization.class);
+
+
+ private final Set<RegularPactTask<?, ?>> references = new HashSet<RegularPactTask<?,?>>();
+
+ private final Object materializationMonitor = new Object();
+
+ private final BroadcastVariableKey key;
+
+ private ArrayList<T> data;
+
+ private boolean materialized;
+
+ private boolean disposed;
+
+
+ public BroadcastVariableMaterialization(BroadcastVariableKey key) {
+ this.key = key;
+ }
+
+
+ public List<T> getMaterializedVariable(MutableReader<?> reader, TypeSerializerFactory<?> serializerFactory, RegularPactTask<?, ?> referenceHolder)
+ throws MaterializationExpiredException, IOException
+ {
+ Preconditions.checkNotNull(reader);
+ Preconditions.checkNotNull(serializerFactory);
+ Preconditions.checkNotNull(referenceHolder);
+
+ final boolean materializer;
+
+ // hold the reference lock only while we track references and decide who should be the materializer
+ // that way, other tasks can de-register (in case of failure) while materialization is happening
+ synchronized (references) {
+ if (disposed) {
+ throw new MaterializationExpiredException();
+ }
+
+ // sanity check
+ if (!references.add(referenceHolder)) {
+ throw new IllegalStateException(
+ String.format("The task %s (%d/%d) already holds a reference to the broadcast variable %s.",
+ referenceHolder.getEnvironment().getTaskName(),
+ referenceHolder.getEnvironment().getIndexInSubtaskGroup() + 1,
+ referenceHolder.getEnvironment().getCurrentNumberOfSubtasks(),
+ key.toString()));
+ }
+
+ materializer = references.size() == 1;
+ }
+
+ try {
+ @SuppressWarnings("unchecked")
+ final MutableReader<DeserializationDelegate<T>> typedReader = (MutableReader<DeserializationDelegate<T>>) reader;
+ @SuppressWarnings("unchecked")
+ final TypeSerializer<T> serializer = ((TypeSerializerFactory<T>) serializerFactory).getSerializer();
+
+ final ReaderIterator<T> readerIterator = new ReaderIterator<T>(typedReader, serializer);
+
+ if (materializer) {
+ // first one, so we need to materialize;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Getting Broadcast Variable (" + key + ") - First access, materializing.");
+ }
+
+ ArrayList<T> data = new ArrayList<T>();
+
+ T element;
+ while ((element = readerIterator.next(serializer.createInstance())) != null) {
+ data.add(element);
+ }
+
+ synchronized (materializationMonitor) {
+ this.data = data;
+ this.materialized = true;
+ materializationMonitor.notifyAll();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Materialization of Broadcast Variable (" + key + ") finished.");
+ }
+ }
+ else {
+ // successor: discard all data and refer to the shared variable
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Getting Broadcast Variable (" + key + ") - shared access.");
+ }
+
+ T element = serializer.createInstance();
+ while ((element = readerIterator.next(element)) != null);
+
+ synchronized (materializationMonitor) {
+ while (!this.materialized) {
+ materializationMonitor.wait();
+ }
+ }
+
+ }
+
+ return this.data;
+ }
+ catch (Throwable t) {
+ // in case of an exception, we need to clean up big time
+ decrementReferenceIfHeld(referenceHolder);
+
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ } else {
+ throw new IOException("Materialization of the broadcast variable failed.", t);
+ }
+ }
+ }
+
+ public boolean decrementReference(RegularPactTask<?, ?> referenceHolder) {
+ return decrementReferenceInternal(referenceHolder, true);
+ }
+
+ public boolean decrementReferenceIfHeld(RegularPactTask<?, ?> referenceHolder) {
+ return decrementReferenceInternal(referenceHolder, false);
+ }
+
+ private boolean decrementReferenceInternal(RegularPactTask<?, ?> referenceHolder, boolean errorIfNoReference) {
+ synchronized (references) {
+ if (disposed || references.isEmpty()) {
+ if (errorIfNoReference) {
+ throw new IllegalStateException("Decrementing reference to broadcast variable that is no longer alive.");
+ } else {
+ return false;
+ }
+ }
+
+ if (!references.remove(referenceHolder)) {
+ if (errorIfNoReference) {
+ throw new IllegalStateException(
+ String.format("The task %s (%d/%d) did not hold a reference to the broadcast variable %s.",
+ referenceHolder.getEnvironment().getTaskName(),
+ referenceHolder.getEnvironment().getIndexInSubtaskGroup() + 1,
+ referenceHolder.getEnvironment().getCurrentNumberOfSubtasks(),
+ key.toString()));
+ } else {
+ return false;
+ }
+ }
+
+
+ if (references.isEmpty()) {
+ disposed = true;
+ data = null;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("unused")
+ private static final class NewObjectCreatingIterator<T> implements Iterator<T> {
+
+ private final TypeSerializer<T> serializer;
+
+ private MutableObjectIterator<T> iterator;
+
+ private T next;
+
+
+ public NewObjectCreatingIterator(TypeSerializer<T> serializer, MutableObjectIterator<T> iterator) {
+ this.serializer = serializer;
+ this.iterator = iterator;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ if (next != null) {
+ return true;
+ }
+ else if (iterator != null) {
+ try {
+ next = iterator.next(serializer.createInstance());
+ if (next != null) {
+ return true;
+ } else {
+ iterator = null;
+ return false;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error getting element for broadcast variable.", e);
+ }
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public T next() {
+ if (hasNext()) {
+ return next;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java
new file mode 100644
index 0000000..f18c431
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.broadcast;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+
+public class DefaultBroadcastVariableInitializer<T> implements BroadcastVariableInitializer<T, List<T>> {
+
+ @Override
+ public List<T> initializeBroadcastVariable(Iterable<T> data) {
+ ArrayList<T> list = new ArrayList<T>();
+
+ for (T value : data) {
+ list.add(value);
+ }
+ return list;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final DefaultBroadcastVariableInitializer<Object> INSTANCE = new DefaultBroadcastVariableInitializer<Object>();
+
+ @SuppressWarnings("unchecked")
+ public static <E> DefaultBroadcastVariableInitializer<E> instance() {
+ return (DefaultBroadcastVariableInitializer<E>) INSTANCE;
+ }
+
+ private DefaultBroadcastVariableInitializer() {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java
new file mode 100644
index 0000000..45f4f47
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.broadcast;
+
+public class MaterializationExpiredException extends Exception {
+ private static final long serialVersionUID = 7476456353634121934L;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 7e692c3..63c6135 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.gates.InputGate;
import org.apache.flink.runtime.io.network.gates.OutputGate;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
@@ -44,6 +45,7 @@ import org.apache.flink.runtime.protocols.AccumulatorProtocol;
* splits, memory manager, etc.
*/
public interface Environment {
+
/**
* Returns the ID of the job from the original job graph. It is used by the library cache manager to find the
* required
@@ -52,6 +54,13 @@ public interface Environment {
* @return the ID of the job from the original job graph
*/
JobID getJobID();
+
+ /**
+ * Gets the ID of the jobVertex that this task corresponds to.
+ *
+ * @return The JobVertexID of this task.
+ */
+ JobVertexID getJobVertexId();
/**
* Returns the task configuration object which was attached to the original JobVertex.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index a7749cd..dc282b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.io.network.gates.GateID;
import org.apache.flink.runtime.io.network.gates.InputGate;
import org.apache.flink.runtime.io.network.gates.OutputGate;
import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -71,8 +72,6 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
/** The interval to sleep in case a communication channel is not yet entirely set up (in milliseconds). */
private static final int SLEEPINTERVAL = 100;
-
-
// --------------------------------------------------------------------------------------------
/** The task that owns this environment */
@@ -208,6 +207,11 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
public JobID getJobID() {
return this.owner.getJobID();
}
+
+ @Override
+ public JobVertexID getJobVertexId() {
+ return this.owner.getVertexID();
+ }
@Override
public GateID getNextUnboundInputGateID() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index d400908..65afc6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -130,12 +130,18 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
// re-read the iterative broadcast variables
for (int i : this.iterativeBroadcastInputs) {
final String name = getTaskConfig().getBroadcastInputName(i);
- readAndSetBroadcastInput(i, name, this.runtimeUdfContext);
+ readAndSetBroadcastInput(i, name, this.runtimeUdfContext, superstepNum);
}
}
// call the parent to execute the superstep
super.run();
+
+ // release the iterative broadcast variables
+ for (int i : this.iterativeBroadcastInputs) {
+ final String name = getTaskConfig().getBroadcastInputName(i);
+ releaseBroadcastVariables(name, superstepNum, this.runtimeUdfContext);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
index e466e75..f7db9d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBro
import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
-import org.apache.flink.runtime.operators.PactTaskContext;
import org.apache.flink.util.Collector;
/**
@@ -39,8 +38,7 @@ import org.apache.flink.util.Collector;
* <p/>
* If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
*/
-public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT>
- implements PactTaskContext<S, OT> {
+public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
private static final Logger log = LoggerFactory.getLogger(IterationTailPactTask.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 35908a9..5cb406b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
import org.slf4j.Logger;
@@ -30,11 +29,11 @@ import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.accumulators.AccumulatorEvent;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -80,7 +79,7 @@ import java.util.Map;
public class RegularPactTask<S extends Function, OT> extends AbstractInvokable implements PactTaskContext<S, OT> {
protected static final Logger LOG = LoggerFactory.getLogger(RegularPactTask.class);
-
+
// --------------------------------------------------------------------------------------------
/**
@@ -127,13 +126,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
protected MutableObjectIterator<?>[] inputIterators;
/**
- * The input readers for the configured broadcast variables, wrapped in an iterator.
- * Prior to the local strategies, etc...
+ * The indices of the iterative inputs. Empty, if the task is not iterative.
*/
- protected MutableObjectIterator<?>[] broadcastInputIterators;
-
protected int[] iterativeInputs;
+ /**
+ * The indices of the iterative broadcast inputs. Empty, if non of the inputs is iteratve.
+ */
protected int[] iterativeBroadcastInputs;
/**
@@ -210,6 +209,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
*/
protected volatile boolean running = true;
+
// --------------------------------------------------------------------------------------------
// Task Interface
// --------------------------------------------------------------------------------------------
@@ -334,7 +334,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
initLocalStrategies(numInputs);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
throw new RuntimeException("Initializing the input processing failed" +
e.getMessage() == null ? "." : ": " + e.getMessage(), e);
}
@@ -349,10 +350,10 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// pre main-function initialization
initialize();
- // read the broadcast variables
+ // read the broadcast variables. they will be released in the finally clause
for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) {
final String name = this.config.getBroadcastInputName(i);
- readAndSetBroadcastInput(i, name, this.runtimeUdfContext);
+ readAndSetBroadcastInput(i, name, this.runtimeUdfContext, 1 /* superstep one for the start */);
}
// the work goes here
@@ -420,24 +421,33 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
}
- protected <X> void readAndSetBroadcastInput(int inputNum, String bcVarName, RuntimeUDFContext context) throws IOException {
- // drain the broadcast inputs
-
- @SuppressWarnings("unchecked")
- final MutableObjectIterator<X> reader = (MutableObjectIterator<X>) this.broadcastInputIterators[inputNum];
+ protected <X> void readAndSetBroadcastInput(int inputNum, String bcVarName, RuntimeUDFContext context, int superstep) throws IOException {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Setting broadcast variable '" + bcVarName + "'" +
+ (superstep > 1 ? ", superstep " + superstep : "")));
+ }
@SuppressWarnings("unchecked")
- final TypeSerializer<X> serializer = (TypeSerializer<X>) this.broadcastInputSerializers[inputNum].getSerializer();
+ final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.broadcastInputSerializers[inputNum];
+
+ final MutableReader<?> reader = this.broadcastInputReaders[inputNum];
- ArrayList<X> collection = new ArrayList<X>();
+ List<X> variable = BroadcastVariableManager.INSTANCE.getBroadcastVariable(bcVarName, superstep, this, reader, serializerFactory);
- X record = serializer.createInstance();
- while (this.running && ((record = reader.next(record)) != null)) {
- collection.add(record);
- record = serializer.createInstance();
+ context.setBroadcastVariable(bcVarName, variable);
+ }
+
+ protected void releaseBroadcastVariables(String bcVarName, int superstep, RuntimeUDFContext context) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Releasing broadcast variable '" + bcVarName + "'" +
+ (superstep > 1 ? ", superstep " + superstep : "")));
}
- context.setBroadcastVariable(bcVarName, collection);
+
+ BroadcastVariableManager.INSTANCE.releaseReference(bcVarName, superstep, this);
+ context.clearBroadcastVariable(bcVarName);
}
+
protected void run() throws Exception {
// ---------------------------- Now, the actual processing starts ------------------------
@@ -515,7 +525,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
catch (Throwable t) {}
}
- // if resettable driver invoke treardown
+ // if resettable driver invoke teardown
if (this.driver instanceof ResettablePactDriver) {
final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
try {
@@ -594,6 +604,19 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
protected void closeLocalStrategiesAndCaches() {
+
+ // make sure that all broadcast variable references held by this task are released
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Releasing all broadcast variables."));
+ }
+
+ BroadcastVariableManager.INSTANCE.releaseAllReferencesFromTask(this);
+ if (runtimeUdfContext != null) {
+ runtimeUdfContext.clearAllBroadcastVariables();
+ }
+
+ // clean all local strategies and caches/pipeline breakers.
+
if (this.localStrategies != null) {
for (int i = 0; i < this.localStrategies.length; i++) {
if (this.localStrategies[i] != null) {
@@ -786,7 +809,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
*/
protected void initBroadcastInputsSerializers(int numBroadcastInputs) throws Exception {
this.broadcastInputSerializers = new TypeSerializerFactory[numBroadcastInputs];
- this.broadcastInputIterators = new MutableObjectIterator[numBroadcastInputs];
ClassLoader userCodeClassLoader = getUserCodeClassLoader();
@@ -794,8 +816,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// ---------------- create the serializer first ---------------------
final TypeSerializerFactory<?> serializerFactory = this.config.getBroadcastInputSerializer(i, userCodeClassLoader);
this.broadcastInputSerializers[i] = serializerFactory;
-
- this.broadcastInputIterators[i] = createInputIterator(this.broadcastInputReaders[i], this.broadcastInputSerializers[i]);
}
}
@@ -1493,6 +1513,4 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
return a;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index ee1a5b7..bd32f10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.io.network.serialization.AdaptiveSpanningRecordD
import org.apache.flink.runtime.io.network.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.serialization.RecordDeserializer.DeserializationResult;
import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -345,4 +346,9 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
public Map<String, FutureTask<Path>> getCopyTask() {
return null;
}
+
+ @Override
+ public JobVertexID getJobVertexId() {
+ return new JobVertexID(new byte[16]);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
index 81950f2..08cded2 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
@@ -36,6 +36,7 @@ import org.apache.flink.util.Visitor;
/**
* Utility to get operator instances from plans via name.
*/
+@SuppressWarnings("deprecation")
public class OperatorResolver implements Visitor<Operator<?>> {
private final Map<String, List<Operator<?>>> map;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 105943c..8460382 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -40,6 +40,7 @@ import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.flink.client.minicluster.NepheleMiniCluster;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Assert;
@@ -101,6 +102,9 @@ public abstract class AbstractTestBase {
} finally {
deleteAllTempFiles();
}
+
+ Assert.assertEquals("Not all broadcast variables were released.",
+ 0, BroadcastVariableManager.INSTANCE.getNumberOfVariablesWithReferences());
}
//------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
index 944d999..a274432 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
@@ -19,7 +19,9 @@
package org.apache.flink.test.broadcastvars;
import java.io.BufferedReader;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -198,7 +200,13 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
@Override
public void open(Configuration parameters) throws Exception {
- this.models = this.getRuntimeContext().getBroadcastVariable("models");
+ List<Record> shared = this.getRuntimeContext().getBroadcastVariable("models");
+ this.models = new ArrayList<Record>(shared.size());
+ synchronized (shared) {
+ for (Record r : shared) {
+ this.models.add(r.createCopy());
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 8227f97..afa38b5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -76,6 +76,7 @@ import org.junit.runners.Parameterized.Parameters;
* - intermediate workset update and solution set tail
* - intermediate solution set update and workset tail
*/
+@SuppressWarnings("deprecation")
@RunWith(Parameterized.class)
public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cc766533/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
index 61c5767..e5f519d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/KMeansBroadcast.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.recordJobs.kmeans;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -47,7 +46,7 @@ import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
-
+@SuppressWarnings("deprecation")
public class KMeansBroadcast implements Program, ProgramDescription {
private static final long serialVersionUID = 1L;
@@ -197,11 +196,13 @@ public class KMeansBroadcast implements Program, ProgramDescription {
*/
@Override
public void open(Configuration parameters) throws Exception {
- Collection<Record> clusterCenters = this.getRuntimeContext().getBroadcastVariable("centers");
+ List<Record> clusterCenters = this.getRuntimeContext().getBroadcastVariable("centers");
centers.clear();
- for (Record r : clusterCenters) {
- centers.add(new PointWithId(r.getField(0, IntValue.class).getValue(), r.getField(1, Point.class)));
+ synchronized (clusterCenters) {
+ for (Record r : clusterCenters) {
+ centers.add(new PointWithId(r.getField(0, IntValue.class).getValue(), r.getField(1, Point.class)));
+ }
}
}