You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/04 12:44:25 UTC
[7/7] flink git commit: [FLINK-6177] Add support for "Distributed
Cache" in streaming applications
[FLINK-6177] Add support for "Distributed Cache" in streaming applications
This closes #3741.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3655dee5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3655dee5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3655dee5
Branch: refs/heads/master
Commit: 3655dee5b5feee46eaadeaae221fa8f358b90340
Parents: 127f1df
Author: Zohar Mizrahi <zo...@parallelmachines.com>
Authored: Sun Apr 9 12:11:57 2017 +0300
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200
----------------------------------------------------------------------
.../environment/StreamExecutionEnvironment.java | 48 ++++++++++++++++
.../api/graph/StreamingJobGraphGenerator.java | 6 ++
.../api/scala/StreamExecutionEnvironment.scala | 48 ++++++++++++++++
.../distributedCache/DistributedCacheTest.java | 60 +++++++++-----------
4 files changed, 130 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e827945..aad3a4b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.io.FileInputFormat;
@@ -46,6 +47,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
@@ -136,6 +138,8 @@ public abstract class StreamExecutionEnvironment {
/** The time characteristic used by the data streams. */
private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
+ protected final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile = new ArrayList<>();
+
// --------------------------------------------------------------------------------------------
// Constructor and Properties
@@ -149,6 +153,13 @@ public abstract class StreamExecutionEnvironment {
}
/**
+ * Get the list of cached files that were registered for distribution among the task managers.
+ */
+ public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCachedFiles() {
+ return cacheFile;
+ }
+
+ /**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as map,
* batchReduce) to run with x parallel instances. This method overrides the
@@ -1774,4 +1785,41 @@ public abstract class StreamExecutionEnvironment {
protected static void resetContextEnvironment() {
contextEnvironmentFactory = null;
}
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+ * The runtime will copy the files temporarily to a local cache, if needed.
+ *
+ * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+ * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
+ * {@link org.apache.flink.api.common.cache.DistributedCache} via
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ */
+ public void registerCachedFile(String filePath, String name) {
+ registerCachedFile(filePath, name, false);
+ }
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+ * The runtime will copy the files temporarily to a local cache, if needed.
+ *
+ * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+ * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
+ * {@link org.apache.flink.api.common.cache.DistributedCache} via
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ * @param executable flag indicating whether the file should be executable
+ */
+ public void registerCachedFile(String filePath, String name, boolean executable) {
+ this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index aa0f08d..b3a6cf8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -151,6 +152,11 @@ public class StreamingJobGraphGenerator {
configureCheckpointing();
+ // add registered cache file into job configuration
+ for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {
+ DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());
+ }
+
// set the ExecutionConfig last when it has been finalized
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 0b2587e..742baf9 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -50,6 +50,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
def getConfig = javaEnv.getConfig
/**
+ * Gets cache files.
+ */
+ def getCachedFiles = javaEnv.getCachedFiles
+
+ /**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
* with x parallel instances. This value can be overridden by specific operations using
@@ -668,6 +673,49 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
}
f
}
+
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a
+ * distributed file system. The runtime will copy the files temporarily to a local cache,
+ * if needed.
+ * <p>
+ * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs
+ * via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
+ * provides access {@link org.apache.flink.api.common.cache.DistributedCache} via
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
+ * "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ */
+ def registerCachedFile(filePath: String, name: String): Unit = {
+ javaEnv.registerCachedFile(filePath, name)
+ }
+
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a
+ * distributed file system. The runtime will copy the files temporarily to a local cache,
+ * if needed.
+ * <p>
+ * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs
+ * via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
+ * provides access {@link org.apache.flink.api.common.cache.DistributedCache} via
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
+ * "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ * @param executable flag indicating whether the file should be executable
+ */
+ def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit = {
+ javaEnv.registerCachedFile(filePath, name, executable)
+ }
}
object StreamExecutionEnvironment {
http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
index 4fb2d95..19bcf76 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
@@ -15,29 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.test.distributedCache;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+package org.apache.flink.test.distributedCache;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.*;
+
+import java.util.*;
-/**
- * Tests the distributed cache by comparing a text file with a distributed copy.
- */
-public class DistributedCacheTest extends JavaProgramTestBase {
+public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
public static final String data
= "machen\n"
+ "zeit\n"
@@ -45,33 +42,31 @@ public class DistributedCacheTest extends JavaProgramTestBase {
+ "keiner\n"
+ "meine\n";
- protected String textPath;
- @Override
- protected void preSubmit() throws Exception {
- textPath = createTempFile("count.txt", data);
+ @Test
+ public void testStreamingDistributedCache() throws Exception {
+ String textPath = createTempFile("count.txt", data);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.registerCachedFile(textPath, "cache_test");
+ env.readTextFile(textPath).flatMap(new WordChecker());
+ env.execute();
}
- @Override
- protected void testProgram() throws Exception {
+ @Test
+ public void testBatchDistributedCache() throws Exception {
+ String textPath = createTempFile("count.txt", data);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile(textPath, "cache_test");
-
- List<Tuple1<String>> result = env
- .readTextFile(textPath)
- .flatMap(new WordChecker())
- .collect();
-
- compareResultAsTuples(result, data);
+ env.readTextFile(textPath).flatMap(new WordChecker()).count();
}
public static class WordChecker extends RichFlatMapFunction<String, Tuple1<String>> {
private static final long serialVersionUID = 1L;
- private final Set<String> wordList = new HashSet<>();
+ private final List<String> wordList = new ArrayList<>();
@Override
- public void open(Configuration conf) throws FileNotFoundException, IOException {
+ public void open(Configuration conf) throws IOException {
File file = getRuntimeContext().getDistributedCache().getFile("cache_test");
BufferedReader reader = new BufferedReader(new FileReader(file));
String tempString;
@@ -83,9 +78,10 @@ public class DistributedCacheTest extends JavaProgramTestBase {
@Override
public void flatMap(String word, Collector<Tuple1<String>> out) throws Exception {
- if (wordList.contains(word)) {
- out.collect(new Tuple1<>(word));
- }
+ assertTrue("Unexpected word in stream! wordFromStream: " + word + ", shouldBeOneOf: " +
+ wordList.toString(), wordList.contains(word));
+
+ out.collect(new Tuple1<>(word));
}
}
}