You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/04 12:44:25 UTC

[7/7] flink git commit: [FLINK-6177] Add support for "Distributed Cache" in streaming applications

[FLINK-6177] Add support for "Distributed Cache" in streaming applications

This closes #3741.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3655dee5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3655dee5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3655dee5

Branch: refs/heads/master
Commit: 3655dee5b5feee46eaadeaae221fa8f358b90340
Parents: 127f1df
Author: Zohar Mizrahi <zo...@parallelmachines.com>
Authored: Sun Apr 9 12:11:57 2017 +0300
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java | 48 ++++++++++++++++
 .../api/graph/StreamingJobGraphGenerator.java   |  6 ++
 .../api/scala/StreamExecutionEnvironment.scala  | 48 ++++++++++++++++
 .../distributedCache/DistributedCacheTest.java  | 60 +++++++++-----------
 4 files changed, 130 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e827945..aad3a4b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.io.FileInputFormat;
@@ -46,6 +47,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
@@ -136,6 +138,8 @@ public abstract class StreamExecutionEnvironment {
 	/** The time characteristic used by the data streams. */
 	private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
 
+	protected final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile = new ArrayList<>();
+
 
 	// --------------------------------------------------------------------------------------------
 	// Constructor and Properties
@@ -149,6 +153,13 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	* Get the list of cached files that were registered for distribution among the task managers.
+	*/
+	public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCachedFiles() {
+		return cacheFile;
+	}
+
+	/**
 	 * Sets the parallelism for operations executed through this environment.
 	 * Setting a parallelism of x here will cause all operators (such as map,
 	 * batchReduce) to run with x parallel instances. This method overrides the
@@ -1774,4 +1785,41 @@ public abstract class StreamExecutionEnvironment {
 	protected static void resetContextEnvironment() {
 		contextEnvironmentFactory = null;
 	}
+
+	/**
+	 * Registers a file at the distributed cache under the given name. The file will be accessible
+	 * from any user-defined function in the (distributed) runtime under a local path. Files
+	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+	 * The runtime will copy the files temporarily to a local cache, if needed.
+	 *
+	 * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+	 * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
+	 * {@link org.apache.flink.api.common.cache.DistributedCache} via
+	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+	 *
+	 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+	 * @param name The name under which the file is registered.
+	 */
+	public void registerCachedFile(String filePath, String name) {
+		registerCachedFile(filePath, name, false);
+	}
+
+	/**
+	 * Registers a file at the distributed cache under the given name. The file will be accessible
+	 * from any user-defined function in the (distributed) runtime under a local path. Files
+	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+	 * The runtime will copy the files temporarily to a local cache, if needed.
+	 *
+	 * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+	 * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
+	 * {@link org.apache.flink.api.common.cache.DistributedCache} via
+	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+	 *
+	 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+	 * @param name The name under which the file is registered.
+	 * @param executable flag indicating whether the file should be executable
+	 */
+	public void registerCachedFile(String filePath, String name, boolean executable) {
+		this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index aa0f08d..b3a6cf8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -151,6 +152,11 @@ public class StreamingJobGraphGenerator {
 
 		configureCheckpointing();
 
+		// add registered cache file into job configuration
+		for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {
+			DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());
+		}
+
 		// set the ExecutionConfig last when it has been finalized
 		try {
 			jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());

http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 0b2587e..742baf9 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -50,6 +50,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   def getConfig = javaEnv.getConfig
 
   /**
+    * Gets cache files.
+    */
+  def getCachedFiles = javaEnv.getCachedFiles
+
+  /**
    * Sets the parallelism for operations executed through this environment.
    * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
    * with x parallel instances. This value can be overridden by specific operations using
@@ -668,6 +673,49 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     }
     f
   }
+
+
+  /**
+    * Registers a file at the distributed cache under the given name. The file will be accessible
+    * from any user-defined function in the (distributed) runtime under a local path. Files
+    * may be local files (as long as all relevant workers have access to it), or files in a
+    * distributed file system. The runtime will copy the files temporarily to a local cache,
+    * if needed.
+    * <p>
+    * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs
+    * via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
+    * provides access {@link org.apache.flink.api.common.cache.DistributedCache} via
+    * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+    *
+    * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
+    *                 "hdfs://host:port/and/path")
+    * @param name     The name under which the file is registered.
+    */
+  def registerCachedFile(filePath: String, name: String): Unit = {
+    javaEnv.registerCachedFile(filePath, name)
+  }
+
+
+  /**
+    * Registers a file at the distributed cache under the given name. The file will be accessible
+    * from any user-defined function in the (distributed) runtime under a local path. Files
+    * may be local files (as long as all relevant workers have access to it), or files in a
+    * distributed file system. The runtime will copy the files temporarily to a local cache,
+    * if needed.
+    * <p>
+    * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs
+    * via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
+    * provides access {@link org.apache.flink.api.common.cache.DistributedCache} via
+    * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+    *
+    * @param filePath   The path of the file, as a URI (e.g. "file:///some/path" or
+    *                   "hdfs://host:port/and/path")
+    * @param name       The name under which the file is registered.
+    * @param executable flag indicating whether the file should be executable
+    */
+  def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit = {
+    javaEnv.registerCachedFile(filePath, name, executable)
+  }
 }
 
 object StreamExecutionEnvironment {

http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
index 4fb2d95..19bcf76 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
@@ -15,29 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.test.distributedCache;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+package org.apache.flink.test.distributedCache;
 
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.*;
+
+import java.util.*;
 
-/**
- * Tests the distributed cache by comparing a text file with a distributed copy.
- */
-public class DistributedCacheTest extends JavaProgramTestBase {
 
+public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
 	public static final String data
 			= "machen\n"
 			+ "zeit\n"
@@ -45,33 +42,31 @@ public class DistributedCacheTest extends JavaProgramTestBase {
 			+ "keiner\n"
 			+ "meine\n";
 
-	protected String textPath;
 
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("count.txt", data);
+	@Test
+	public void testStreamingDistributedCache() throws Exception {
+		String textPath = createTempFile("count.txt", data);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.registerCachedFile(textPath, "cache_test");
+		env.readTextFile(textPath).flatMap(new WordChecker());
+		env.execute();
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
+	@Test
+	public void testBatchDistributedCache() throws Exception {
+		String textPath = createTempFile("count.txt", data);
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.registerCachedFile(textPath, "cache_test");
-
-		List<Tuple1<String>> result = env
-				.readTextFile(textPath)
-				.flatMap(new WordChecker())
-				.collect();
-
-		compareResultAsTuples(result, data);
+		env.readTextFile(textPath).flatMap(new WordChecker()).count();
 	}
 
 	public static class WordChecker extends RichFlatMapFunction<String, Tuple1<String>> {
 		private static final long serialVersionUID = 1L;
 
-		private final Set<String> wordList = new HashSet<>();
+		private final List<String> wordList = new ArrayList<>();
 
 		@Override
-		public void open(Configuration conf) throws FileNotFoundException, IOException {
+		public void open(Configuration conf) throws IOException {
 			File file = getRuntimeContext().getDistributedCache().getFile("cache_test");
 			BufferedReader reader = new BufferedReader(new FileReader(file));
 			String tempString;
@@ -83,9 +78,10 @@ public class DistributedCacheTest extends JavaProgramTestBase {
 
 		@Override
 		public void flatMap(String word, Collector<Tuple1<String>> out) throws Exception {
-			if (wordList.contains(word)) {
-				out.collect(new Tuple1<>(word));
-			}
+			assertTrue("Unexpected word in stream! wordFromStream: " + word + ", shouldBeOneOf: " +
+				wordList.toString(), wordList.contains(word));
+
+			out.collect(new Tuple1<>(word));
 		}
 	}
 }