You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/16 19:44:35 UTC

[1/3] flink git commit: [docs] Fix some errors in programming_guide.md

Repository: flink
Updated Branches:
  refs/heads/master c2b1eb796 -> 358259d25


[docs] Fix some errors in programming_guide.md


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c1b5f0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c1b5f0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c1b5f0e

Branch: refs/heads/master
Commit: 3c1b5f0e7f28d18868e941712d2ca42b140ef3a0
Parents: c2b1eb7
Author: CHEN LIANG <ch...@sina.cn>
Authored: Sat Aug 15 00:04:08 2015 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 16 18:21:05 2015 +0200

----------------------------------------------------------------------
 docs/apis/programming_guide.md | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c1b5f0e/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 85c639e..eb874b9 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -217,13 +217,11 @@ programs with a `main()` method. Each program consists of the same basic parts:
 1. Obtain an `ExecutionEnvironment`,
 2. Load/create the initial data,
 3. Specify transformations on this data,
-4. Specify where to put the results of your computations, and
+4. Specify where to put the results of your computations,
 5. Trigger the program execution
 
 We will now give an overview of each of those steps, please refer to the respective sections for
-more details. Note that all
-{% gh_link /flink-java/src/main/java/org/apache/flink/api/java "core classes of the Java API" %}
-are found in the package `org.apache.flink.api.java`.
+more details. Note that all core classes of the Java API are found in the package {% gh_link /flink-java/src/main/java/org/apache/flink/api/java "org.apache.flink.api.java" %}.
 
 The `ExecutionEnvironment` is the basis for all Flink programs. You can
 obtain one using these static methods on class `ExecutionEnvironment`:
@@ -231,6 +229,8 @@ obtain one using these static methods on class `ExecutionEnvironment`:
 {% highlight java %}
 getExecutionEnvironment()
 
+createCollectionsEnvironment()
+
 createLocalEnvironment()
 createLocalEnvironment(int parallelism)
 createLocalEnvironment(Configuration customConfiguration)
@@ -272,7 +272,7 @@ a map transformation looks like this:
 {% highlight java %}
 DataSet<String> input = ...;
 
-DataSet<Integer> tokenized = text.map(new MapFunction<String, Integer>() {
+DataSet<Integer> tokenized = input.map(new MapFunction<String, Integer>() {
     @Override
     public Integer map(String value) {
         return Integer.parseInt(value);
@@ -284,7 +284,7 @@ This will create a new DataSet by converting every String in the original
 set to an Integer. For more information and a list of all the transformations,
 please refer to [Transformations](#transformations).
 
-Once you have a DataSet containing your final results, you can either write the result
+Once you have a DataSet containing your final results, you can either write the result 
 to a file system (HDFS or local) or print it.
 
 {% highlight java %}
@@ -307,10 +307,10 @@ programs with a `main()` method. Each program consists of the same basic parts:
 1. Obtain an `ExecutionEnvironment`,
 2. Load/create the initial data,
 3. Specify transformations on this data,
-4. Specify where to put the results of your computations, and
+4. Specify where to put the results of your computations,
 5. Trigger the program execution
 
-We will now give an overview of each of those steps but please refer to the respective sections for
+We will now give an overview of each of those steps, please refer to the respective sections for
 more details. Note that all core classes of the Scala API are found in the package 
 {% gh_link /flink-scala/src/main/scala/org/apache/flink/api/scala "org.apache.flink.api.scala" %}.
 
@@ -324,6 +324,8 @@ def getExecutionEnvironment
 def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()))
 def createLocalEnvironment(customConfiguration: Configuration)
 
+def createCollectionsEnvironment
+
 def createRemoteEnvironment(host: String, port: String, jarFiles: String*)
 def createRemoteEnvironment(host: String, port: String, parallelism: Int, jarFiles: String*)
 {% endhighlight %}
@@ -361,7 +363,7 @@ a map transformation looks like this:
 {% highlight scala %}
 val input: DataSet[String] = ...
 
-val mapped = text.map { x => x.toInt }
+val mapped = input.map { x => x.toInt }
 {% endhighlight %}
 
 This will create a new DataSet by converting every String in the original


[3/3] flink git commit: [FLINK-2458] [FLINK-2449] [runtime] Access distributed cache entries from Iteration contexts & use of distributed cache from Collection Environments

Posted by se...@apache.org.
[FLINK-2458] [FLINK-2449] [runtime] Access distributed cache entries from Iteration contexts & use of distributed cache from Collection Environments

This closes #970


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/358259d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/358259d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/358259d2

Branch: refs/heads/master
Commit: 358259d25edc25251073e26f47e3960a69990098
Parents: 0a7cc02
Author: Sachin Goel <sa...@gmail.com>
Authored: Sat Aug 1 19:25:42 2015 +0530
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 16 18:34:37 2015 +0200

----------------------------------------------------------------------
 .../util/AbstractRuntimeUDFContext.java         | 13 +--
 .../functions/util/RuntimeUDFContext.java       |  9 +-
 .../common/operators/CollectionExecutor.java    | 86 +++++++++++++++++---
 .../functions/util/RuntimeUDFContextTest.java   | 12 +--
 .../api/common/io/RichInputFormatTest.java      |  5 +-
 .../api/common/io/RichOutputFormatTest.java     |  5 +-
 .../operators/GenericDataSinkBaseTest.java      |  7 +-
 .../operators/GenericDataSourceBaseTest.java    |  7 +-
 .../base/FlatMapOperatorCollectionTest.java     |  4 +-
 .../operators/base/JoinOperatorBaseTest.java    |  8 +-
 .../common/operators/base/MapOperatorTest.java  |  7 +-
 .../base/PartitionMapOperatorTest.java          |  6 +-
 .../base/CoGroupOperatorCollectionTest.java     |  5 +-
 .../operators/base/GroupReduceOperatorTest.java |  6 +-
 .../operators/base/JoinOperatorBaseTest.java    |  6 +-
 .../operators/base/ReduceOperatorTest.java      |  6 +-
 .../task/AbstractIterativePactTask.java         |  9 +-
 .../util/DistributedRuntimeUDFContext.java      |  6 --
 .../flink/tez/runtime/RegularProcessor.java     |  3 +
 .../aggregators/AggregatorsITCase.java          | 45 +++++++++-
 20 files changed, 185 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 13d79e7..71be1e1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -57,17 +57,6 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 
 	private final DistributedCache distributedCache;
 
-
-	public AbstractRuntimeUDFContext(String name,
-										int numParallelSubtasks, int subtaskIndex,
-										ClassLoader userCodeClassLoader,
-										ExecutionConfig executionConfig,
-										Map<String, Accumulator<?,?>> accumulators)
-	{
-		this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig,
-				accumulators, Collections.<String, Future<Path>>emptyMap());
-	}
-
 	public AbstractRuntimeUDFContext(String name,
 										int numParallelSubtasks, int subtaskIndex,
 										ClassLoader userCodeClassLoader,
@@ -79,7 +68,7 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 		this.subtaskIndex = subtaskIndex;
 		this.userCodeClassLoader = userCodeClassLoader;
 		this.executionConfig = executionConfig;
-		this.distributedCache = new DistributedCache(cpTasks);
+		this.distributedCache = new DistributedCache(Preconditions.checkNotNull(cpTasks));
 		this.accumulators = Preconditions.checkNotNull(accumulators);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index 1689138..c582768 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -37,18 +37,11 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
 	private final HashMap<String, Object> initializedBroadcastVars = new HashMap<String, Object>();
 	
 	private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>();
-	
-	
-	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
-							ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) {
-		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators);
-	}
-	
+
 	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
 							ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
 		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks);
 	}
-	
 
 	@Override
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index e8e0012..b6d8128 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -27,6 +27,10 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -37,6 +41,7 @@ import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
@@ -51,6 +56,8 @@ import org.apache.flink.api.common.operators.util.TypeComparable;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Visitor;
 
@@ -64,6 +71,8 @@ public class CollectionExecutor {
 	private final Map<Operator<?>, List<?>> intermediateResults;
 	
 	private final Map<String, Accumulator<?, ?>> accumulators;
+
+	private final Map<String, Future<Path>> cachedFiles;
 	
 	private final Map<String, Value> previousAggregates;
 	
@@ -84,7 +93,7 @@ public class CollectionExecutor {
 		this.accumulators = new HashMap<String, Accumulator<?,?>>();
 		this.previousAggregates = new HashMap<String, Value>();
 		this.aggregators = new HashMap<String, Aggregator<?>>();
-		
+		this.cachedFiles = new HashMap<String, Future<Path>>();
 		this.classLoader = getClass().getClassLoader();
 	}
 	
@@ -94,7 +103,7 @@ public class CollectionExecutor {
 	
 	public JobExecutionResult execute(Plan program) throws Exception {
 		long startTime = System.currentTimeMillis();
-		
+		initCache(program.getCachedFiles());
 		Collection<? extends GenericDataSinkBase<?>> sinks = program.getDataSinks();
 		for (Operator<?> sink : sinks) {
 			execute(sink);
@@ -104,7 +113,14 @@ public class CollectionExecutor {
 		Map<String, Object> accumulatorResults = AccumulatorHelper.toResultMap(accumulators);
 		return new JobExecutionResult(null, endTime - startTime, accumulatorResults);
 	}
-	
+
+	private void initCache(Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> files){
+		for(Map.Entry<String, DistributedCache.DistributedCacheEntry> file: files){
+			Future<Path> doNothing = new CompletedFuture(new Path(file.getValue().filePath));
+			cachedFiles.put(file.getKey(), doNothing);
+		}
+	};
+
 	private List<?> execute(Operator<?> operator) throws Exception {
 		return execute(operator, 0);
 	}
@@ -165,8 +181,8 @@ public class CollectionExecutor {
 		// build the runtime context and compute broadcast variables, if necessary
 		RuntimeUDFContext ctx;
 		if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) {
-			ctx = superStep == 0 ? new RuntimeUDFContext(typedSink.getName(), 1, 0, getClass().getClassLoader(), executionConfig, accumulators) :
-					new IterationRuntimeUDFContext(typedSink.getName(), 1, 0, classLoader, executionConfig, accumulators);
+			ctx = superStep == 0 ? new RuntimeUDFContext(typedSink.getName(), 1, 0, getClass().getClassLoader(), executionConfig, cachedFiles, accumulators) :
+					new IterationRuntimeUDFContext(typedSink.getName(), 1, 0, classLoader, executionConfig, cachedFiles, accumulators);
 		} else {
 			ctx = null;
 		}
@@ -181,8 +197,8 @@ public class CollectionExecutor {
 		// build the runtime context and compute broadcast variables, if necessary
 		RuntimeUDFContext ctx;
 		if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) {
-			ctx = superStep == 0 ? new RuntimeUDFContext(source.getName(), 1, 0, getClass().getClassLoader(), executionConfig, accumulators) :
-					new IterationRuntimeUDFContext(source.getName(), 1, 0, classLoader, executionConfig, accumulators);
+			ctx = superStep == 0 ? new RuntimeUDFContext(source.getName(), 1, 0, getClass().getClassLoader(), executionConfig, cachedFiles, accumulators) :
+					new IterationRuntimeUDFContext(source.getName(), 1, 0, classLoader, executionConfig, cachedFiles, accumulators);
 		} else {
 			ctx = null;
 		}
@@ -204,8 +220,10 @@ public class CollectionExecutor {
 		// build the runtime context and compute broadcast variables, if necessary
 		RuntimeUDFContext ctx;
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
-			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig, accumulators) :
-					new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators);
+			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass()
+					.getClassLoader(), executionConfig, cachedFiles, accumulators) :
+					new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader,
+							executionConfig, cachedFiles, accumulators);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
 				List<?> bcData = execute(bcInputs.getValue());
@@ -243,8 +261,10 @@ public class CollectionExecutor {
 		// build the runtime context and compute broadcast variables, if necessary
 		RuntimeUDFContext ctx;
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
-			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators) :
-				new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators);
+			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader,
+					executionConfig, cachedFiles, accumulators) :
+				new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader,
+						executionConfig, cachedFiles, accumulators);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
 				List<?> bcData = execute(bcInputs.getValue());
@@ -500,8 +520,9 @@ public class CollectionExecutor {
 	private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext {
 
 		public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader,
-										ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) {
-			super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig, accumulators);
+										ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String,
+				Accumulator<?,?>> accumulators) {
+			super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig, cpTasks, accumulators);
 		}
 
 		@Override
@@ -521,4 +542,43 @@ public class CollectionExecutor {
 			return (T) previousAggregates.get(name);
 		}
 	}
+
+	private static final class CompletedFuture implements Future<Path>{
+
+		private final Path result;
+
+		public CompletedFuture(Path entry) {
+			try{
+				LocalFileSystem fs = (LocalFileSystem) entry.getFileSystem();
+				result = entry.isAbsolute() ? new Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry);
+			} catch (Exception e){
+				throw new RuntimeException("DistributedCache supports only local files for Collection Environments");
+			}
+		}
+
+		@Override
+		public boolean cancel(boolean mayInterruptIfRunning) {
+			return false;
+		}
+
+		@Override
+		public boolean isCancelled() {
+			return false;
+		}
+
+		@Override
+		public boolean isDone() {
+			return true;
+		}
+
+		@Override
+		public Path get() throws InterruptedException, ExecutionException {
+			return result;
+		}
+
+		@Override
+		public Path get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+			return get();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
index 9189d5b..5e8f891 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
@@ -24,10 +24,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.core.fs.Path;
 import org.junit.Test;
 
 
@@ -36,7 +38,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testBroadcastVariableNotFound() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(),new HashMap<String, Accumulator<?, ?>>());
 			
 			try {
 				ctx.getBroadcastVariable("some name");
@@ -66,7 +68,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testBroadcastVariableSimple() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
 			
 			ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
 			ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0));
@@ -100,7 +102,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testBroadcastVariableWithInitializer() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
 			
 			ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
 			
@@ -125,7 +127,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testResetBroadcastVariableWithInitializer() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
 			
 			ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
 			
@@ -148,7 +150,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testBroadcastVariableWithInitializerAndMismatch() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
 			
 			ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
index 126a511..2a9cef3 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
@@ -20,10 +20,12 @@
 package org.apache.flink.api.common.io;
 
 import java.util.HashMap;
+import java.util.concurrent.Future;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.junit.Test;
@@ -36,8 +38,7 @@ public class RichInputFormatTest {
 	@Test
 	public void testCheckRuntimeContextAccess() {
 		final SerializedInputFormat<Value> inputFormat = new SerializedInputFormat<Value>();
-		inputFormat.setRuntimeContext(new RuntimeUDFContext("test name", 3, 1,
-				getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>()));
+		inputFormat.setRuntimeContext(new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()));
 
 		Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
 		Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
index 8d41039..10d03ab 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
@@ -20,10 +20,12 @@
 package org.apache.flink.api.common.io;
 
 import java.util.HashMap;
+import java.util.concurrent.Future;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.junit.Test;
@@ -36,8 +38,7 @@ public class RichOutputFormatTest {
 	@Test
 	public void testCheckRuntimeContextAccess() {
 		final SerializedOutputFormat<Value> inputFormat = new SerializedOutputFormat<Value>();
-		inputFormat.setRuntimeContext(new RuntimeUDFContext("test name", 3, 1,
-				getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>()));
+		inputFormat.setRuntimeContext(new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()));
 
 		Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
 		Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
index e30e536..9a46305 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
@@ -26,10 +26,12 @@ import org.apache.flink.api.common.operators.util.TestNonRichOutputFormat;
 import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.common.operators.util.TestRichOutputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.Nothing;
 import org.junit.Test;
 
 import java.util.HashMap;
+import java.util.concurrent.Future;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -87,15 +89,16 @@ public class GenericDataSinkBaseTest implements java.io.Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
+			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
 			executionConfig.disableObjectReuse();
 			in.reset();
-			sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext("test_sink", 1, 0, null, executionConfig, accumulatorMap), executionConfig);
+			sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext("test_sink", 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
 			assertEquals(out.output, asList(TestIOData.RICH_NAMES));
 
 			executionConfig.enableObjectReuse();
 			out.clear();
 			in.reset();
-			sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext("test_sink", 1, 0, null, executionConfig, accumulatorMap), executionConfig);
+			sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext("test_sink", 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
 			assertEquals(out.output, asList(TestIOData.RICH_NAMES));
 		} catch(Exception e){
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
index 64d3358..50b8d80 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
@@ -25,10 +25,12 @@ import org.apache.flink.api.common.operators.util.TestIOData;
 import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.common.operators.util.TestRichInputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.core.fs.Path;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -73,13 +75,14 @@ public class GenericDataSourceBaseTest implements java.io.Serializable {
 							in, new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO), "testSource");
 
 			final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
+			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<String> resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext("test_source", 1, 0, null, executionConfig, accumulatorMap), executionConfig);
+			List<String> resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext("test_source", 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
 
 			in.reset();
 			executionConfig.enableObjectReuse();
-			List<String> resultRegular = source.executeOnCollections(new RuntimeUDFContext("test_source", 1, 0, null, executionConfig, accumulatorMap), executionConfig);
+			List<String> resultRegular = source.executeOnCollections(new RuntimeUDFContext("test_source", 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
 
 			assertEquals(asList(TestIOData.RICH_NAMES), resultMutableSafe);
 			assertEquals(asList(TestIOData.RICH_NAMES), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
index 734324b..745cf09 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
@@ -36,6 +37,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Future;
 
 @SuppressWarnings("serial")
 public class FlatMapOperatorCollectionTest implements Serializable {
@@ -74,7 +76,7 @@ public class FlatMapOperatorCollectionTest implements Serializable {
 		}
 		// run on collections
 		final List<String> result = getTestFlatMapOperator(udf)
-				.executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+				.executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 
 		Assert.assertEquals(input.size(), result.size());
 		Assert.assertEquals(input, result);

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
index 98f75bc..6d4ff33 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -36,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 @SuppressWarnings("serial")
@@ -117,11 +119,13 @@ public class JoinOperatorBaseTest implements Serializable {
 
 		try {
 			final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
+			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
+
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig);
+			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig);
+			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
 
 			assertEquals(expected, resultSafe);
 			assertEquals(expected, resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
index 2c98a17..0dbe1b6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
@@ -24,6 +24,7 @@ import static java.util.Arrays.asList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -36,6 +37,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
@@ -105,11 +107,12 @@ public class MapOperatorTest implements java.io.Serializable {
 			
 			List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6"));
 			final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
+			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig);
+			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig);
+			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
 			
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
index 28c6d821..47f30de 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
@@ -24,10 +24,12 @@ import static java.util.Arrays.asList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
@@ -80,9 +82,9 @@ public class PartitionMapOperatorTest implements java.io.Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 			
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
index 71a2eb7..025fcfb 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.builder.Tuple2Builder;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
@@ -40,6 +41,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Future;
 
 @SuppressWarnings("serial")
 public class CoGroupOperatorCollectionTest implements Serializable {
@@ -71,7 +73,8 @@ public class CoGroupOperatorCollectionTest implements Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
-			final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, accumulators);
+			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
+			final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, cpTasks, accumulators);
 
 			{
 				SumCoGroup udf1 = new SumCoGroup();

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index 08f4acd..9e1684d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -37,6 +38,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Arrays.asList;
@@ -163,9 +165,9 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 			
 			
 			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
index 21fcfb3..7390af2 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -38,6 +39,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Future;
 
 @SuppressWarnings({ "unchecked", "serial" })
 public class JoinOperatorBaseTest implements Serializable {
@@ -105,9 +107,9 @@ public class JoinOperatorBaseTest implements Serializable {
 		try {
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 
 			assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultSafe));
 			assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultRegular));

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
index 7cd9771..29faf03 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -35,6 +36,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Arrays.asList;
@@ -140,9 +142,9 @@ public class ReduceOperatorTest implements java.io.Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 
 			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
 			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 67d8f56..efe74f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.iterative.task;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.core.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
@@ -55,6 +56,7 @@ import org.apache.flink.util.MutableObjectIterator;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 /**
  * The abstract base class for all tasks able to participate in an iteration.
@@ -169,7 +171,8 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
 	public DistributedRuntimeUDFContext createRuntimeContext(String taskName) {
 		Environment env = getEnvironment();
 		return new IterativeRuntimeUdfContext(taskName, env.getNumberOfSubtasks(),
-				env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), this.accumulatorMap);
+				env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(),
+				env.getDistributedCacheEntries(), this.accumulatorMap);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -359,9 +362,9 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
 	private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext {
 
 		public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
-										ExecutionConfig executionConfig,
+										ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks,
 										Map<String, Accumulator<?,?>> accumulatorMap) {
-			super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulatorMap);
+			super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
index 4b480ba..f74989e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
@@ -41,12 +41,6 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext {
 
 	private final HashMap<String, BroadcastVariableMaterialization<?, ?>> broadcastVars = new HashMap<String, BroadcastVariableMaterialization<?, ?>>();
 	
-	
-	public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
-										ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) {
-		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators);
-	}
-	
 	public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
 										ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
 		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks);

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
index 5bfa49b..b117bab 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.tez.runtime;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.operators.PactDriver;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.tez.util.EncodingUtils;
@@ -40,6 +41,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 
 public class RegularProcessor<S extends Function, OT> extends AbstractLogicalIOProcessor {
@@ -70,6 +72,7 @@ public class RegularProcessor<S extends Function, OT> extends AbstractLogicalIOP
 				getContext().getTaskIndex(),
 				getClass().getClassLoader(),
 				new ExecutionConfig(),
+				new HashMap<String, Future<Path>>(),
 				new HashMap<String, Accumulator<?, ?>>());
 
 		this.task = new TezTask<S, OT>(taskConfig, runtimeUdfContext, this.getContext().getTotalMemoryAvailableToTask());

http://git-wip-us.apache.org/repos/asf/flink/blob/358259d2/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 44544d3..8b98b29 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -18,8 +18,14 @@
 
 package org.apache.flink.test.iterative.aggregators;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
 import java.util.Random;
 
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.After;
 import org.junit.Assert;
@@ -44,6 +50,8 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Test the functionality of aggregators in bulk and delta iterative cases.
  */
@@ -54,6 +62,10 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	private static final int parallelism = 2;
 	private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
 
+	private static String testString = "Et tu, Brute?";
+	private static String testName = "testing_caesar";
+	private static String testPath;
+
 	public AggregatorsITCase(TestExecutionMode mode){
 		super(mode);
 	}
@@ -66,7 +78,9 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 
 	@Before
 	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
+		File tempFile = tempFolder.newFile();
+		testPath = tempFile.toString();
+		resultPath = tempFile.toURI().toString();
 	}
 
 	@After
@@ -75,6 +89,35 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testDistributedCacheWithIterations() throws Exception{
+		File tempFile = new File(testPath);
+		FileWriter writer = new FileWriter(tempFile);
+		writer.write(testString);
+		writer.close();
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.registerCachedFile(resultPath, testName);
+
+		IterativeDataSet<Long> solution = env.fromElements(1L).iterate(2);
+		solution.closeWith(env.generateSequence(1,2).filter(new RichFilterFunction<Long>() {
+			@Override
+			public void open(Configuration parameters) throws Exception{
+				File file = getRuntimeContext().getDistributedCache().getFile(testName);
+				BufferedReader reader = new BufferedReader(new FileReader(file));
+				String output = reader.readLine();
+				reader.close();
+				assertEquals(output, testString);
+			}
+			@Override
+			public boolean filter(Long value) throws Exception {
+				return false;
+			}
+		}).withBroadcastSet(solution, "SOLUTION")).output(new DiscardingOutputFormat<Long>());
+		env.execute();
+		expected = testString; // this will be a useless verification now.
+	}
+
+	@Test
 	public void testAggregatorWithoutParameterForIterate() throws Exception {
 		/*
 		 * Test aggregator without parameter for iterate


[2/3] flink git commit: [FLINK-2487] [ml] In cosine distance, check that datapoints are of same cardinality

Posted by se...@apache.org.
[FLINK-2487] [ml] In cosine distance, check that datapoints are of same cardinality

This closes #1021


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a7cc023
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a7cc023
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a7cc023

Branch: refs/heads/master
Commit: 0a7cc02354abc985b92704729a8c12a856056398
Parents: 3c1b5f0
Author: Rucongzhang <zh...@huawei.com>
Authored: Sat Aug 15 17:25:25 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 16 18:25:59 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/windowing/deltafunction/CosineDistance.java   | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a7cc023/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
index 15aaf51..77486d0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
@@ -52,6 +52,11 @@ public class CosineDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, dou
 			return 0;
 		}
 
+		if (oldDataPoint.length != newDataPoint.length) {
+			throw new IllegalArgumentException(
+					"The size of two input arrays are not same, can not compute cosine distance");
+		}
+
 		double sum1 = 0;
 		double sum2 = 0;
 		for (int i = 0; i < oldDataPoint.length; i++) {