You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/04 12:44:19 UTC
[1/7] flink git commit: [hotfix] [py] Improve error reporting in
Python*InputStreamer
Repository: flink
Updated Branches:
refs/heads/master 99fb1f881 -> a2ec3ee66
[hotfix] [py] Improve error reporting in Python*InputStreamer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2ec3ee6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2ec3ee6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2ec3ee6
Branch: refs/heads/master
Commit: a2ec3ee664b540c1213991d7fcf56d8873e60d40
Parents: 154bb3b
Author: zentol <ch...@apache.org>
Authored: Thu Apr 20 14:08:54 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200
----------------------------------------------------------------------
.../python/api/streaming/data/PythonDualInputStreamer.java | 6 +++---
.../python/api/streaming/data/PythonSingleInputStreamer.java | 6 ++----
2 files changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a2ec3ee6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
index b7e8a25..8c9fde9 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
-import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Iterator;
@@ -46,9 +45,8 @@ public class PythonDualInputStreamer<IN1, IN2, OUT> extends PythonStreamer<Pytho
* @param iterator1 first input stream
* @param iterator2 second input stream
* @param c collector
- * @throws IOException
*/
- public final void streamBufferWithGroups(Iterator<IN1> iterator1, Iterator<IN2> iterator2, Collector<OUT> c) throws IOException {
+ public final void streamBufferWithGroups(Iterator<IN1> iterator1, Iterator<IN2> iterator2, Collector<OUT> c) {
SingleElementPushBackIterator<IN1> i1 = new SingleElementPushBackIterator<>(iterator1);
SingleElementPushBackIterator<IN2> i2 = new SingleElementPushBackIterator<>(iterator2);
try {
@@ -93,6 +91,8 @@ public class PythonDualInputStreamer<IN1, IN2, OUT> extends PythonStreamer<Pytho
}
} catch (SocketTimeoutException ignored) {
throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+ } catch (Exception e) {
+ throw new RuntimeException("Critical failure for task " + function.getRuntimeContext().getTaskName() + ". " + msg.get(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2ec3ee6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
index e7f018c..6c0a13c 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
-import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Iterator;
@@ -43,9 +42,8 @@ public class PythonSingleInputStreamer<IN, OUT> extends PythonStreamer<PythonSin
*
* @param iterator input stream
* @param c collector
- * @throws IOException
*/
- public final void streamBufferWithoutGroups(Iterator<IN> iterator, Collector<OUT> c) throws IOException {
+ public final void streamBufferWithoutGroups(Iterator<IN> iterator, Collector<OUT> c) {
SingleElementPushBackIterator<IN> i = new SingleElementPushBackIterator<>(iterator);
try {
int size;
@@ -86,7 +84,7 @@ public class PythonSingleInputStreamer<IN, OUT> extends PythonStreamer<PythonSin
} catch (SocketTimeoutException ignored) {
throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg.get());
} catch (Exception e) {
- throw new RuntimeException("Critical failure. " + msg.get(), e);
+ throw new RuntimeException("Critical failure for task " + function.getRuntimeContext().getTaskName() + ". " + msg.get(), e);
}
}
}
[4/7] flink git commit: [FLINK-6274] Replaces usages of
org.codehaus.jackson
Posted by ch...@apache.org.
[FLINK-6274] Replaces usages of org.codehaus.jackson
This closes #3780.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6aa5bad7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6aa5bad7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6aa5bad7
Branch: refs/heads/master
Commit: 6aa5bad75867395202a5e10a2215ce492ed7dcc6
Parents: 99fb1f8
Author: zentol <ch...@apache.org>
Authored: Thu Apr 6 14:33:47 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200
----------------------------------------------------------------------
.../flink/streaming/examples/twitter/TwitterExample.java | 4 ++--
.../util/AbstractDeserializationSchemaTest.java | 3 +--
.../test/optimizer/jsonplan/DumpCompiledPlanTest.java | 6 +++---
.../test/optimizer/jsonplan/PreviewPlanDumpTest.java | 11 ++++++-----
4 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6aa5bad7/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
index 4f13f34..6d1bce5 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
@@ -17,6 +17,8 @@
package org.apache.flink.streaming.examples.twitter;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
@@ -25,8 +27,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
import org.apache.flink.util.Collector;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
import java.util.StringTokenizer;
http://git-wip-us.apache.org/repos/asf/flink/blob/6aa5bad7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
index 1ae144c..77178d7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
@@ -18,14 +18,13 @@
package org.apache.flink.streaming.util;
+import com.fasterxml.jackson.databind.util.JSONPObject;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema;
-import org.codehaus.jackson.map.util.JSONPObject;
-
import org.junit.Test;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/6aa5bad7/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
index 9f9d42b..6932fd2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -18,6 +18,9 @@
package org.apache.flink.test.optimizer.jsonplan;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
@@ -31,9 +34,6 @@ import org.apache.flink.examples.java.relational.TPCHQuery3;
import org.apache.flink.examples.java.relational.WebLogAnalysis;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/6aa5bad7/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
index 6f02ab6..14f6656 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
@@ -20,6 +20,9 @@ package org.apache.flink.test.optimizer.jsonplan;
import java.util.List;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.api.common.Plan;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PreviewPlanEnvironment;
@@ -32,9 +35,6 @@ import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
import org.junit.Assert;
import org.junit.Test;
@@ -168,8 +168,9 @@ public class PreviewPlanDumpTest extends CompilerTestBase {
List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(p);
PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
String json = dumper.getPactPlanAsJSON(sinks);
- JsonParser parser = new JsonFactory().createJsonParser(json);
- while (parser.nextToken() != null);
+ try (JsonParser parser = new JsonFactory().createParser(json)) {
+ while (parser.nextToken() != null) ;
+ }
} catch (JsonParseException e) {
e.printStackTrace();
Assert.fail("JSON Generator produced malformatted output: " + e.getMessage());
[7/7] flink git commit: [FLINK-6177] Add support for "Distributed
Cache" in streaming applications
Posted by ch...@apache.org.
[FLINK-6177] Add support for "Distributed Cache" in streaming applications
This closes #3741.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3655dee5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3655dee5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3655dee5
Branch: refs/heads/master
Commit: 3655dee5b5feee46eaadeaae221fa8f358b90340
Parents: 127f1df
Author: Zohar Mizrahi <zo...@parallelmachines.com>
Authored: Sun Apr 9 12:11:57 2017 +0300
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200
----------------------------------------------------------------------
.../environment/StreamExecutionEnvironment.java | 48 ++++++++++++++++
.../api/graph/StreamingJobGraphGenerator.java | 6 ++
.../api/scala/StreamExecutionEnvironment.scala | 48 ++++++++++++++++
.../distributedCache/DistributedCacheTest.java | 60 +++++++++-----------
4 files changed, 130 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e827945..aad3a4b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.io.FileInputFormat;
@@ -46,6 +47,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
@@ -136,6 +138,8 @@ public abstract class StreamExecutionEnvironment {
/** The time characteristic used by the data streams. */
private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
+ protected final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile = new ArrayList<>();
+
// --------------------------------------------------------------------------------------------
// Constructor and Properties
@@ -149,6 +153,13 @@ public abstract class StreamExecutionEnvironment {
}
/**
+ * Get the list of cached files that were registered for distribution among the task managers.
+ */
+ public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCachedFiles() {
+ return cacheFile;
+ }
+
+ /**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as map,
* batchReduce) to run with x parallel instances. This method overrides the
@@ -1774,4 +1785,41 @@ public abstract class StreamExecutionEnvironment {
protected static void resetContextEnvironment() {
contextEnvironmentFactory = null;
}
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+ * The runtime will copy the files temporarily to a local cache, if needed.
+ *
+ * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+ * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
+ * {@link org.apache.flink.api.common.cache.DistributedCache} via
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ */
+ public void registerCachedFile(String filePath, String name) {
+ registerCachedFile(filePath, name, false);
+ }
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+ * The runtime will copy the files temporarily to a local cache, if needed.
+ *
+ * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+ * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
+ * {@link org.apache.flink.api.common.cache.DistributedCache} via
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ * @param executable flag indicating whether the file should be executable
+ */
+ public void registerCachedFile(String filePath, String name, boolean executable) {
+ this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index aa0f08d..b3a6cf8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -151,6 +152,11 @@ public class StreamingJobGraphGenerator {
configureCheckpointing();
+ // add registered cache file into job configuration
+ for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {
+ DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());
+ }
+
// set the ExecutionConfig last when it has been finalized
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 0b2587e..742baf9 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -50,6 +50,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
def getConfig = javaEnv.getConfig
/**
+ * Gets cache files.
+ */
+ def getCachedFiles = javaEnv.getCachedFiles
+
+ /**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
* with x parallel instances. This value can be overridden by specific operations using
@@ -668,6 +673,49 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
}
f
}
+
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a
+ * distributed file system. The runtime will copy the files temporarily to a local cache,
+ * if needed.
+ * <p>
+ * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs
+ * via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
+ * provides access {@link org.apache.flink.api.common.cache.DistributedCache} via
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
+ * "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ */
+ def registerCachedFile(filePath: String, name: String): Unit = {
+ javaEnv.registerCachedFile(filePath, name)
+ }
+
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a
+ * distributed file system. The runtime will copy the files temporarily to a local cache,
+ * if needed.
+ * <p>
+ * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs
+ * via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
+ * provides access {@link org.apache.flink.api.common.cache.DistributedCache} via
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
+ * "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ * @param executable flag indicating whether the file should be executable
+ */
+ def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit = {
+ javaEnv.registerCachedFile(filePath, name, executable)
+ }
}
object StreamExecutionEnvironment {
http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
index 4fb2d95..19bcf76 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
@@ -15,29 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.test.distributedCache;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+package org.apache.flink.test.distributedCache;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.*;
+
+import java.util.*;
-/**
- * Tests the distributed cache by comparing a text file with a distributed copy.
- */
-public class DistributedCacheTest extends JavaProgramTestBase {
+public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
public static final String data
= "machen\n"
+ "zeit\n"
@@ -45,33 +42,31 @@ public class DistributedCacheTest extends JavaProgramTestBase {
+ "keiner\n"
+ "meine\n";
- protected String textPath;
- @Override
- protected void preSubmit() throws Exception {
- textPath = createTempFile("count.txt", data);
+ @Test
+ public void testStreamingDistributedCache() throws Exception {
+ String textPath = createTempFile("count.txt", data);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.registerCachedFile(textPath, "cache_test");
+ env.readTextFile(textPath).flatMap(new WordChecker());
+ env.execute();
}
- @Override
- protected void testProgram() throws Exception {
+ @Test
+ public void testBatchDistributedCache() throws Exception {
+ String textPath = createTempFile("count.txt", data);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile(textPath, "cache_test");
-
- List<Tuple1<String>> result = env
- .readTextFile(textPath)
- .flatMap(new WordChecker())
- .collect();
-
- compareResultAsTuples(result, data);
+ env.readTextFile(textPath).flatMap(new WordChecker()).count();
}
public static class WordChecker extends RichFlatMapFunction<String, Tuple1<String>> {
private static final long serialVersionUID = 1L;
- private final Set<String> wordList = new HashSet<>();
+ private final List<String> wordList = new ArrayList<>();
@Override
- public void open(Configuration conf) throws FileNotFoundException, IOException {
+ public void open(Configuration conf) throws IOException {
File file = getRuntimeContext().getDistributedCache().getFile("cache_test");
BufferedReader reader = new BufferedReader(new FileReader(file));
String tempString;
@@ -83,9 +78,10 @@ public class DistributedCacheTest extends JavaProgramTestBase {
@Override
public void flatMap(String word, Collector<Tuple1<String>> out) throws Exception {
- if (wordList.contains(word)) {
- out.collect(new Tuple1<>(word));
- }
+ assertTrue("Unexpected word in stream! wordFromStream: " + word + ", shouldBeOneOf: " +
+ wordList.toString(), wordList.contains(word));
+
+ out.collect(new Tuple1<>(word));
}
}
}
[5/7] flink git commit: [FLINK-6367] support custom header settings
of allow origin
Posted by ch...@apache.org.
[FLINK-6367] support custom header settings of allow origin
This closes #3769.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/127f1df6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/127f1df6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/127f1df6
Branch: refs/heads/master
Commit: 127f1df642ecc332cab742bd040e67613e4bad25
Parents: 49d0e43
Author: shijinkui <sh...@huawei.com>
Authored: Tue Apr 25 20:09:15 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200
----------------------------------------------------------------------
docs/setup/config.md | 2 ++
.../java/org/apache/flink/configuration/ConfigConstants.java | 6 ++++++
.../flink/runtime/webmonitor/RuntimeMonitorHandler.java | 7 ++++++-
.../org/apache/flink/runtime/webmonitor/WebMonitorConfig.java | 4 ++++
.../apache/flink/runtime/webmonitor/WebRuntimeMonitor.java | 7 ++++---
5 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/127f1df6/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 250d92f..a3daac0 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -375,6 +375,8 @@ These parameters allow for advanced tuning. The default values are sufficient wh
- `jobmanager.web.ssl.enabled`: Enable https access to the web frontend. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: `true`).
+- `jobmanager.web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`.
+
### File Systems
The parameters define the behavior of tasks that create result files.
http://git-wip-us.apache.org/repos/asf/flink/blob/127f1df6/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 61c1b27..975a3d4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1333,6 +1333,12 @@ public final class ConfigConstants {
key("jobmanager.web.address")
.noDefaultValue();
+ /** The config parameter defining the Access-Control-Allow-Origin header for all
+ * responses from the web-frontend. */
+ public static final ConfigOption<String> JOB_MANAGER_WEB_ACCESS_CONTROL_ALLOW_ORIGIN =
+ key("jobmanager.web.access-control-allow-origin")
+ .defaultValue("*");
+
/** The config key for the port of the JobManager web frontend.
* Setting this value to {@code -1} disables the web frontend. */
public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
http://git-wip-us.apache.org/repos/asf/flink/blob/127f1df6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 3d7fbed..90fe2e0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -66,14 +66,19 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
private final RequestHandler handler;
+ private final String allowOrigin;
+
public RuntimeMonitorHandler(
+ WebMonitorConfig cfg,
RequestHandler handler,
JobManagerRetriever retriever,
Future<String> localJobManagerAddressFuture,
FiniteDuration timeout,
boolean httpsEnabled) {
+
super(retriever, localJobManagerAddressFuture, timeout, httpsEnabled);
this.handler = checkNotNull(handler);
+ this.allowOrigin = cfg.getAllowOrigin();
}
@Override
@@ -122,7 +127,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
LOG.debug("Error while handling request", e);
}
- response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+ response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
// Content-Encoding:utf-8
response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, ENCODING.name());
http://git-wip-us.apache.org/repos/asf/flink/blob/127f1df6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index 18fc5e8..11e94b0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -78,4 +78,8 @@ public class WebMonitorConfig {
ConfigConstants.JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED);
}
+
+ public String getAllowOrigin() {
+ return config.getString(ConfigConstants.JOB_MANAGER_WEB_ACCESS_CONTROL_ALLOW_ORIGIN);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/127f1df6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index c1e6229..f83fa27 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -136,6 +136,8 @@ public class WebRuntimeMonitor implements WebMonitor {
private final BackPressureStatsTracker backPressureStatsTracker;
+ private final WebMonitorConfig cfg;
+
private AtomicBoolean cleanedUp = new AtomicBoolean();
private ExecutorService executorService;
@@ -150,8 +152,7 @@ public class WebRuntimeMonitor implements WebMonitor {
this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
this.timeout = AkkaUtils.getTimeout(config);
this.retriever = new JobManagerRetriever(this, actorSystem, AkkaUtils.getTimeout(config), timeout);
-
- final WebMonitorConfig cfg = new WebMonitorConfig(config);
+ this.cfg = new WebMonitorConfig(config);
final String configuredAddress = cfg.getWebFrontendAddress();
@@ -515,7 +516,7 @@ public class WebRuntimeMonitor implements WebMonitor {
// ------------------------------------------------------------------------
private RuntimeMonitorHandler handler(RequestHandler handler) {
- return new RuntimeMonitorHandler(handler, retriever, jobManagerAddressPromise.future(), timeout,
+ return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressPromise.future(), timeout,
serverSSLContext != null);
}
[2/7] flink git commit: [hotfix] [py] Code cleanup - Functions
Posted by ch...@apache.org.
[hotfix] [py] Code cleanup - Functions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36950629
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36950629
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36950629
Branch: refs/heads/master
Commit: 369506291a483158bd041f31ebcfc6e7fe86a263
Parents: 3655dee
Author: zentol <ch...@apache.org>
Authored: Thu Apr 20 13:26:38 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200
----------------------------------------------------------------------
.../flink/python/api/functions/util/IdentityGroupReduce.java | 2 ++
.../org/apache/flink/python/api/functions/util/SerializerMap.java | 2 +-
2 files changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/36950629/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
index 1e7bbe6..32fd22a 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
@@ -12,12 +12,14 @@
*/
package org.apache.flink.python.api.functions.util;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.functions.GroupReduceFunction;
/*
Utility function to group and sort data.
*/
+@ForwardedFields("*->*")
public class IdentityGroupReduce<IN> implements GroupReduceFunction<IN, IN> {
@Override
public final void reduce(Iterable<IN> values, Collector<IN> out) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/36950629/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
index 9c39e5f..116efd4 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
@@ -20,7 +20,7 @@ import org.apache.flink.python.api.streaming.util.SerializationUtils.Serializer;
Utility function to serialize values, usually directly from data sources.
*/
public class SerializerMap<IN> implements MapFunction<IN, byte[]> {
- private Serializer<IN> serializer = null;
+ private transient Serializer<IN> serializer;
@Override
@SuppressWarnings("unchecked")
[6/7] flink git commit: [hotfix] [py] Code cleanup -
SerializationUtils
Posted by ch...@apache.org.
[hotfix] [py] Code cleanup - SerializationUtils
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/154bb3bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/154bb3bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/154bb3bb
Branch: refs/heads/master
Commit: 154bb3bb09dcc6dbc3dd7e1a6984ff5afd41431b
Parents: 3695062
Author: zentol <ch...@apache.org>
Authored: Thu Apr 20 13:27:15 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200
----------------------------------------------------------------------
.../api/streaming/plan/PythonPlanSender.java | 1 -
.../api/streaming/util/SerializationUtils.java | 67 ++++++++++++--------
2 files changed, 40 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/154bb3bb/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
index 8b8366a..331c67e 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
@@ -28,7 +28,6 @@ public class PythonPlanSender {
this.output = new DataOutputStream(output);
}
- @SuppressWarnings("unchecked")
public void sendRecord(Object record) throws IOException {
byte[] data = SerializationUtils.getSerializer(record).serialize(record);
output.write(data);
http://git-wip-us.apache.org/repos/asf/flink/blob/154bb3bb/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
index f228327..721746a 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
@@ -20,21 +20,22 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
public class SerializationUtils {
- public static final byte TYPE_BOOLEAN = (byte) 34;
- public static final byte TYPE_BYTE = (byte) 33;
- public static final byte TYPE_INTEGER = (byte) 32;
- public static final byte TYPE_LONG = (byte) 31;
- public static final byte TYPE_DOUBLE = (byte) 30;
- public static final byte TYPE_FLOAT = (byte) 29;
- public static final byte TYPE_STRING = (byte) 28;
- public static final byte TYPE_BYTES = (byte) 27;
- public static final byte TYPE_NULL = (byte) 26;
+ public static final byte TYPE_BOOLEAN = 34;
+ public static final byte TYPE_BYTE = 33;
+ public static final byte TYPE_INTEGER = 32;
+ public static final byte TYPE_LONG = 31;
+ public static final byte TYPE_DOUBLE = 30;
+ public static final byte TYPE_FLOAT = 29;
+ public static final byte TYPE_STRING = 28;
+ public static final byte TYPE_BYTES = 27;
+ public static final byte TYPE_NULL = 26;
private enum SupportedTypes {
TUPLE, BOOLEAN, BYTE, BYTES, INTEGER, LONG, FLOAT, DOUBLE, STRING, NULL, CUSTOMTYPEWRAPPER
}
- public static Serializer getSerializer(Object value) {
+ @SuppressWarnings("unchecked")
+ public static <IN> Serializer<IN> getSerializer(IN value) {
String className = value.getClass().getSimpleName().toUpperCase();
if (className.startsWith("TUPLE")) {
className = "TUPLE";
@@ -43,35 +44,48 @@ public class SerializationUtils {
className = "BYTES";
}
SupportedTypes type = SupportedTypes.valueOf(className);
+ Serializer<?> serializer;
switch (type) {
case TUPLE:
- return new TupleSerializer((Tuple) value);
+ serializer = new TupleSerializer((Tuple) value);
+ break;
case BOOLEAN:
- return new BooleanSerializer();
+ serializer = new BooleanSerializer();
+ break;
case BYTE:
- return new ByteSerializer();
+ serializer = new ByteSerializer();
+ break;
case BYTES:
- return new BytesSerializer();
+ serializer = new BytesSerializer();
+ break;
case INTEGER:
- return new IntSerializer();
+ serializer = new IntSerializer();
+ break;
case LONG:
- return new LongSerializer();
+ serializer = new LongSerializer();
+ break;
case STRING:
- return new StringSerializer();
+ serializer = new StringSerializer();
+ break;
case FLOAT:
- return new FloatSerializer();
+ serializer = new FloatSerializer();
+ break;
case DOUBLE:
- return new DoubleSerializer();
+ serializer = new DoubleSerializer();
+ break;
case NULL:
- return new NullSerializer();
+ serializer = new NullSerializer();
+ break;
case CUSTOMTYPEWRAPPER:
- return new CustomTypeWrapperSerializer((CustomTypeWrapper) value);
+ serializer = new CustomTypeWrapperSerializer((CustomTypeWrapper) value);
+ break;
default:
throw new IllegalArgumentException("Unsupported Type encountered: " + type);
}
+ return (Serializer<IN>) serializer;
}
- public static abstract class Serializer<IN> {
+ public abstract static class Serializer<IN> {
private byte[] typeInfo = null;
public byte[] serialize(IN value) {
@@ -130,7 +144,7 @@ public class SerializationUtils {
public static class BooleanSerializer extends Serializer<Boolean> {
@Override
public byte[] serializeWithoutTypeInfo(Boolean value) {
- return new byte[]{value ? (byte) 1 : (byte) 0};
+ return new byte[]{(byte) (value ? 1 : 0)};
}
@Override
@@ -237,7 +251,7 @@ public class SerializationUtils {
}
public static class TupleSerializer extends Serializer<Tuple> {
- private final Serializer[] serializer;
+ private final Serializer<Object>[] serializer;
public TupleSerializer(Tuple value) {
serializer = new Serializer[value.getArity()];
@@ -246,7 +260,6 @@ public class SerializationUtils {
}
}
- @SuppressWarnings("unchecked")
@Override
public byte[] serializeWithoutTypeInfo(Tuple value) {
ArrayList<byte[]> bits = new ArrayList<>();
@@ -269,7 +282,7 @@ public class SerializationUtils {
@Override
public void putTypeInfo(ByteBuffer buffer) {
buffer.put((byte) serializer.length);
- for (Serializer s : serializer) {
+ for (Serializer<Object> s : serializer) {
s.putTypeInfo(buffer);
}
}
@@ -277,7 +290,7 @@ public class SerializationUtils {
@Override
public int getTypeInfoSize() {
int size = 1;
- for (Serializer s : serializer) {
+ for (Serializer<Object> s : serializer) {
size += s.getTypeInfoSize();
}
return size;
[3/7] flink git commit: [FLINK-3709] Prevent caching of outdated
suspended ExecutionGraphs
Posted by ch...@apache.org.
[FLINK-3709] Prevent caching of outdated suspended ExecutionGraphs
This closes #3709.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49d0e432
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49d0e432
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49d0e432
Branch: refs/heads/master
Commit: 49d0e4321df1853d00e26593556c34acb9bed3d2
Parents: 6aa5bad
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Tue Apr 11 19:48:52 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/webmonitor/ExecutionGraphHolder.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/49d0e432/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 3d0cfc0..f9faa85 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.slf4j.Logger;
@@ -67,14 +68,18 @@ public class ExecutionGraphHolder {
public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
AccessExecutionGraph cached = cache.get(jid);
if (cached != null) {
- return cached;
+ if (cached.getState() == JobStatus.SUSPENDED) {
+ cache.remove(jid);
+ } else {
+ return cached;
+ }
}
try {
if (jobManager != null) {
Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
Object result = Await.result(future, timeout);
-
+
if (result instanceof JobManagerMessages.JobNotFound) {
return null;
}