You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/04 12:44:19 UTC

[1/7] flink git commit: [hotfix] [py] Improve error reporting in Python*InputStreamer

Repository: flink
Updated Branches:
  refs/heads/master 99fb1f881 -> a2ec3ee66


[hotfix] [py] Improve error reporting in Python*InputStreamer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2ec3ee6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2ec3ee6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2ec3ee6

Branch: refs/heads/master
Commit: a2ec3ee664b540c1213991d7fcf56d8873e60d40
Parents: 154bb3b
Author: zentol <ch...@apache.org>
Authored: Thu Apr 20 14:08:54 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200

----------------------------------------------------------------------
 .../python/api/streaming/data/PythonDualInputStreamer.java     | 6 +++---
 .../python/api/streaming/data/PythonSingleInputStreamer.java   | 6 ++----
 2 files changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a2ec3ee6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
index b7e8a25..8c9fde9 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
-import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.Iterator;
 
@@ -46,9 +45,8 @@ public class PythonDualInputStreamer<IN1, IN2, OUT> extends PythonStreamer<Pytho
 	 * @param iterator1 first input stream
 	 * @param iterator2 second input stream
 	 * @param c         collector
-	 * @throws IOException
 	 */
-	public final void streamBufferWithGroups(Iterator<IN1> iterator1, Iterator<IN2> iterator2, Collector<OUT> c) throws IOException {
+	public final void streamBufferWithGroups(Iterator<IN1> iterator1, Iterator<IN2> iterator2, Collector<OUT> c) {
 		SingleElementPushBackIterator<IN1> i1 = new SingleElementPushBackIterator<>(iterator1);
 		SingleElementPushBackIterator<IN2> i2 = new SingleElementPushBackIterator<>(iterator2);
 		try {
@@ -93,6 +91,8 @@ public class PythonDualInputStreamer<IN1, IN2, OUT> extends PythonStreamer<Pytho
 			}
 		} catch (SocketTimeoutException ignored) {
 			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+		} catch (Exception e) {
+			throw new RuntimeException("Critical failure for task " + function.getRuntimeContext().getTaskName() + ". " + msg.get(), e);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2ec3ee6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
index e7f018c..6c0a13c 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
-import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.Iterator;
 
@@ -43,9 +42,8 @@ public class PythonSingleInputStreamer<IN, OUT> extends PythonStreamer<PythonSin
 	 *
 	 * @param iterator input stream
 	 * @param c        collector
-	 * @throws IOException
 	 */
-	public final void streamBufferWithoutGroups(Iterator<IN> iterator, Collector<OUT> c) throws IOException {
+	public final void streamBufferWithoutGroups(Iterator<IN> iterator, Collector<OUT> c) {
 		SingleElementPushBackIterator<IN> i = new SingleElementPushBackIterator<>(iterator);
 		try {
 			int size;
@@ -86,7 +84,7 @@ public class PythonSingleInputStreamer<IN, OUT> extends PythonStreamer<PythonSin
 		} catch (SocketTimeoutException ignored) {
 			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg.get());
 		} catch (Exception e) {
-			throw new RuntimeException("Critical failure. " + msg.get(), e);
+			throw new RuntimeException("Critical failure for task " + function.getRuntimeContext().getTaskName() + ". " + msg.get(), e);
 		}
 	}
 }


[4/7] flink git commit: [FLINK-6274] Replaces usages of org.codehaus.jackson

Posted by ch...@apache.org.
[FLINK-6274] Replaces usages of org.codehaus.jackson

This closes #3780.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6aa5bad7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6aa5bad7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6aa5bad7

Branch: refs/heads/master
Commit: 6aa5bad75867395202a5e10a2215ce492ed7dcc6
Parents: 99fb1f8
Author: zentol <ch...@apache.org>
Authored: Thu Apr 6 14:33:47 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200

----------------------------------------------------------------------
 .../flink/streaming/examples/twitter/TwitterExample.java |  4 ++--
 .../util/AbstractDeserializationSchemaTest.java          |  3 +--
 .../test/optimizer/jsonplan/DumpCompiledPlanTest.java    |  6 +++---
 .../test/optimizer/jsonplan/PreviewPlanDumpTest.java     | 11 ++++++-----
 4 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6aa5bad7/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
index 4f13f34..6d1bce5 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.examples.twitter;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
@@ -25,8 +27,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.twitter.TwitterSource;
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
 import org.apache.flink.util.Collector;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
 
 import java.util.StringTokenizer;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6aa5bad7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
index 1ae144c..77178d7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.streaming.util;
 
+import com.fasterxml.jackson.databind.util.JSONPObject;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema;
 
-import org.codehaus.jackson.map.util.JSONPObject;
-
 import org.junit.Test;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/6aa5bad7/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
index 9f9d42b..6932fd2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.test.optimizer.jsonplan;
 
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.client.program.OptimizerPlanEnvironment;
@@ -31,9 +34,6 @@ import org.apache.flink.examples.java.relational.TPCHQuery3;
 import org.apache.flink.examples.java.relational.WebLogAnalysis;
 import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6aa5bad7/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
index 6f02ab6..14f6656 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
@@ -20,6 +20,9 @@ package org.apache.flink.test.optimizer.jsonplan;
 
 import java.util.List;
 
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PreviewPlanEnvironment;
@@ -32,9 +35,6 @@ import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -168,8 +168,9 @@ public class PreviewPlanDumpTest extends CompilerTestBase {
 			List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(p);
 			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
 			String json = dumper.getPactPlanAsJSON(sinks);
-			JsonParser parser = new JsonFactory().createJsonParser(json);
-			while (parser.nextToken() != null);
+			try (JsonParser parser = new JsonFactory().createParser(json)) {
+				while (parser.nextToken() != null) ;
+			}
 		} catch (JsonParseException e) {
 			e.printStackTrace();
 			Assert.fail("JSON Generator produced malformatted output: " + e.getMessage());


[7/7] flink git commit: [FLINK-6177] Add support for "Distributed Cache" in streaming applications

Posted by ch...@apache.org.
[FLINK-6177] Add support for "Distributed Cache" in streaming applications

This closes #3741.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3655dee5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3655dee5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3655dee5

Branch: refs/heads/master
Commit: 3655dee5b5feee46eaadeaae221fa8f358b90340
Parents: 127f1df
Author: Zohar Mizrahi <zo...@parallelmachines.com>
Authored: Sun Apr 9 12:11:57 2017 +0300
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java | 48 ++++++++++++++++
 .../api/graph/StreamingJobGraphGenerator.java   |  6 ++
 .../api/scala/StreamExecutionEnvironment.scala  | 48 ++++++++++++++++
 .../distributedCache/DistributedCacheTest.java  | 60 +++++++++-----------
 4 files changed, 130 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e827945..aad3a4b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.io.FileInputFormat;
@@ -46,6 +47,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
@@ -136,6 +138,8 @@ public abstract class StreamExecutionEnvironment {
 	/** The time characteristic used by the data streams. */
 	private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
 
+	protected final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile = new ArrayList<>();
+
 
 	// --------------------------------------------------------------------------------------------
 	// Constructor and Properties
@@ -149,6 +153,13 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	* Get the list of cached files that were registered for distribution among the task managers.
+	*/
+	public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCachedFiles() {
+		return cacheFile;
+	}
+
+	/**
 	 * Sets the parallelism for operations executed through this environment.
 	 * Setting a parallelism of x here will cause all operators (such as map,
 	 * batchReduce) to run with x parallel instances. This method overrides the
@@ -1774,4 +1785,41 @@ public abstract class StreamExecutionEnvironment {
 	protected static void resetContextEnvironment() {
 		contextEnvironmentFactory = null;
 	}
+
+	/**
+	 * Registers a file at the distributed cache under the given name. The file will be accessible
+	 * from any user-defined function in the (distributed) runtime under a local path. Files
+	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+	 * The runtime will copy the files temporarily to a local cache, if needed.
+	 *
+	 * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+	 * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
+	 * {@link org.apache.flink.api.common.cache.DistributedCache} via
+	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+	 *
+	 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+	 * @param name The name under which the file is registered.
+	 */
+	public void registerCachedFile(String filePath, String name) {
+		registerCachedFile(filePath, name, false);
+	}
+
+	/**
+	 * Registers a file at the distributed cache under the given name. The file will be accessible
+	 * from any user-defined function in the (distributed) runtime under a local path. Files
+	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+	 * The runtime will copy the files temporarily to a local cache, if needed.
+	 *
+	 * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+	 * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
+	 * {@link org.apache.flink.api.common.cache.DistributedCache} via
+	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+	 *
+	 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+	 * @param name The name under which the file is registered.
+	 * @param executable flag indicating whether the file should be executable
+	 */
+	public void registerCachedFile(String filePath, String name, boolean executable) {
+		this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index aa0f08d..b3a6cf8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -151,6 +152,11 @@ public class StreamingJobGraphGenerator {
 
 		configureCheckpointing();
 
+		// add registered cache file into job configuration
+		for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {
+			DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());
+		}
+
 		// set the ExecutionConfig last when it has been finalized
 		try {
 			jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());

http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 0b2587e..742baf9 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -50,6 +50,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   def getConfig = javaEnv.getConfig
 
   /**
+    * Gets cache files.
+    */
+  def getCachedFiles = javaEnv.getCachedFiles
+
+  /**
    * Sets the parallelism for operations executed through this environment.
    * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
    * with x parallel instances. This value can be overridden by specific operations using
@@ -668,6 +673,49 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     }
     f
   }
+
+
+  /**
+    * Registers a file at the distributed cache under the given name. The file will be accessible
+    * from any user-defined function in the (distributed) runtime under a local path. Files
+    * may be local files (as long as all relevant workers have access to it), or files in a
+    * distributed file system. The runtime will copy the files temporarily to a local cache,
+    * if needed.
+    * <p>
+    * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs
+    * via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
+    * provides access {@link org.apache.flink.api.common.cache.DistributedCache} via
+    * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+    *
+    * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
+    *                 "hdfs://host:port/and/path")
+    * @param name     The name under which the file is registered.
+    */
+  def registerCachedFile(filePath: String, name: String): Unit = {
+    javaEnv.registerCachedFile(filePath, name)
+  }
+
+
+  /**
+    * Registers a file at the distributed cache under the given name. The file will be accessible
+    * from any user-defined function in the (distributed) runtime under a local path. Files
+    * may be local files (as long as all relevant workers have access to it), or files in a
+    * distributed file system. The runtime will copy the files temporarily to a local cache,
+    * if needed.
+    * <p>
+    * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs
+    * via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
+    * provides access {@link org.apache.flink.api.common.cache.DistributedCache} via
+    * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+    *
+    * @param filePath   The path of the file, as a URI (e.g. "file:///some/path" or
+    *                   "hdfs://host:port/and/path")
+    * @param name       The name under which the file is registered.
+    * @param executable flag indicating whether the file should be executable
+    */
+  def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit = {
+    javaEnv.registerCachedFile(filePath, name, executable)
+  }
 }
 
 object StreamExecutionEnvironment {

http://git-wip-us.apache.org/repos/asf/flink/blob/3655dee5/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
index 4fb2d95..19bcf76 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
@@ -15,29 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.test.distributedCache;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+package org.apache.flink.test.distributedCache;
 
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.*;
+
+import java.util.*;
 
-/**
- * Tests the distributed cache by comparing a text file with a distributed copy.
- */
-public class DistributedCacheTest extends JavaProgramTestBase {
 
+public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
 	public static final String data
 			= "machen\n"
 			+ "zeit\n"
@@ -45,33 +42,31 @@ public class DistributedCacheTest extends JavaProgramTestBase {
 			+ "keiner\n"
 			+ "meine\n";
 
-	protected String textPath;
 
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("count.txt", data);
+	@Test
+	public void testStreamingDistributedCache() throws Exception {
+		String textPath = createTempFile("count.txt", data);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.registerCachedFile(textPath, "cache_test");
+		env.readTextFile(textPath).flatMap(new WordChecker());
+		env.execute();
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
+	@Test
+	public void testBatchDistributedCache() throws Exception {
+		String textPath = createTempFile("count.txt", data);
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.registerCachedFile(textPath, "cache_test");
-
-		List<Tuple1<String>> result = env
-				.readTextFile(textPath)
-				.flatMap(new WordChecker())
-				.collect();
-
-		compareResultAsTuples(result, data);
+		env.readTextFile(textPath).flatMap(new WordChecker()).count();
 	}
 
 	public static class WordChecker extends RichFlatMapFunction<String, Tuple1<String>> {
 		private static final long serialVersionUID = 1L;
 
-		private final Set<String> wordList = new HashSet<>();
+		private final List<String> wordList = new ArrayList<>();
 
 		@Override
-		public void open(Configuration conf) throws FileNotFoundException, IOException {
+		public void open(Configuration conf) throws IOException {
 			File file = getRuntimeContext().getDistributedCache().getFile("cache_test");
 			BufferedReader reader = new BufferedReader(new FileReader(file));
 			String tempString;
@@ -83,9 +78,10 @@ public class DistributedCacheTest extends JavaProgramTestBase {
 
 		@Override
 		public void flatMap(String word, Collector<Tuple1<String>> out) throws Exception {
-			if (wordList.contains(word)) {
-				out.collect(new Tuple1<>(word));
-			}
+			assertTrue("Unexpected word in stream! wordFromStream: " + word + ", shouldBeOneOf: " +
+				wordList.toString(), wordList.contains(word));
+
+			out.collect(new Tuple1<>(word));
 		}
 	}
 }


[5/7] flink git commit: [FLINK-6367] support custom header settings of allow origin

Posted by ch...@apache.org.
[FLINK-6367] support custom header settings of allow origin

This closes #3769.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/127f1df6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/127f1df6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/127f1df6

Branch: refs/heads/master
Commit: 127f1df642ecc332cab742bd040e67613e4bad25
Parents: 49d0e43
Author: shijinkui <sh...@huawei.com>
Authored: Tue Apr 25 20:09:15 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200

----------------------------------------------------------------------
 docs/setup/config.md                                          | 2 ++
 .../java/org/apache/flink/configuration/ConfigConstants.java  | 6 ++++++
 .../flink/runtime/webmonitor/RuntimeMonitorHandler.java       | 7 ++++++-
 .../org/apache/flink/runtime/webmonitor/WebMonitorConfig.java | 4 ++++
 .../apache/flink/runtime/webmonitor/WebRuntimeMonitor.java    | 7 ++++---
 5 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/127f1df6/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 250d92f..a3daac0 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -375,6 +375,8 @@ These parameters allow for advanced tuning. The default values are sufficient wh
 
 - `jobmanager.web.ssl.enabled`: Enable https access to the web frontend. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: `true`).
 
+- `jobmanager.web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`.
+
 ### File Systems
 
 The parameters define the behavior of tasks that create result files.

http://git-wip-us.apache.org/repos/asf/flink/blob/127f1df6/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 61c1b27..975a3d4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1333,6 +1333,12 @@ public final class ConfigConstants {
 		key("jobmanager.web.address")
 			.noDefaultValue();
 
+	/** The config parameter defining the Access-Control-Allow-Origin header for all
+	 * responses from the web-frontend. */
+	public static final ConfigOption<String> JOB_MANAGER_WEB_ACCESS_CONTROL_ALLOW_ORIGIN =
+		key("jobmanager.web.access-control-allow-origin")
+			.defaultValue("*");
+
 	/** The config key for the port of the JobManager web frontend.
 	 * Setting this value to {@code -1} disables the web frontend. */
 	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;

http://git-wip-us.apache.org/repos/asf/flink/blob/127f1df6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 3d7fbed..90fe2e0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -66,14 +66,19 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 
 	private final RequestHandler handler;
 
+	private final String allowOrigin;
+
 	public RuntimeMonitorHandler(
+			WebMonitorConfig cfg,
 			RequestHandler handler,
 			JobManagerRetriever retriever,
 			Future<String> localJobManagerAddressFuture,
 			FiniteDuration timeout,
 			boolean httpsEnabled) {
+
 		super(retriever, localJobManagerAddressFuture, timeout, httpsEnabled);
 		this.handler = checkNotNull(handler);
+		this.allowOrigin = cfg.getAllowOrigin();
 	}
 
 	@Override
@@ -122,7 +127,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 			LOG.debug("Error while handling request", e);
 		}
 
-		response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+		response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
 		// Content-Encoding:utf-8
 		response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, ENCODING.name());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/127f1df6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index 18fc5e8..11e94b0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -78,4 +78,8 @@ public class WebMonitorConfig {
 			ConfigConstants.JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY,
 			ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED);
 	}
+
+	public String getAllowOrigin() {
+		return config.getString(ConfigConstants.JOB_MANAGER_WEB_ACCESS_CONTROL_ALLOW_ORIGIN);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/127f1df6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index c1e6229..f83fa27 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -136,6 +136,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private final BackPressureStatsTracker backPressureStatsTracker;
 
+	private final WebMonitorConfig cfg;
+
 	private AtomicBoolean cleanedUp = new AtomicBoolean();
 
 	private ExecutorService executorService;
@@ -150,8 +152,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
 		this.timeout = AkkaUtils.getTimeout(config);
 		this.retriever = new JobManagerRetriever(this, actorSystem, AkkaUtils.getTimeout(config), timeout);
-		
-		final WebMonitorConfig cfg = new WebMonitorConfig(config);
+		this.cfg = new WebMonitorConfig(config);
 
 		final String configuredAddress = cfg.getWebFrontendAddress();
 
@@ -515,7 +516,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 	// ------------------------------------------------------------------------
 
 	private RuntimeMonitorHandler handler(RequestHandler handler) {
-		return new RuntimeMonitorHandler(handler, retriever, jobManagerAddressPromise.future(), timeout,
+		return new RuntimeMonitorHandler(cfg, handler, retriever, jobManagerAddressPromise.future(), timeout,
 			serverSSLContext !=  null);
 	}
 


[2/7] flink git commit: [hotfix] [py] Code cleanup - Functions

Posted by ch...@apache.org.
[hotfix] [py] Code cleanup - Functions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36950629
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36950629
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36950629

Branch: refs/heads/master
Commit: 369506291a483158bd041f31ebcfc6e7fe86a263
Parents: 3655dee
Author: zentol <ch...@apache.org>
Authored: Thu Apr 20 13:26:38 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200

----------------------------------------------------------------------
 .../flink/python/api/functions/util/IdentityGroupReduce.java       | 2 ++
 .../org/apache/flink/python/api/functions/util/SerializerMap.java  | 2 +-
 2 files changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/36950629/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
index 1e7bbe6..32fd22a 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
@@ -12,12 +12,14 @@
  */
 package org.apache.flink.python.api.functions.util;
 
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 
 /*
 Utility function to group and sort data.
 */
+@ForwardedFields("*->*")
 public class IdentityGroupReduce<IN> implements GroupReduceFunction<IN, IN> {
 	@Override
 	public final void reduce(Iterable<IN> values, Collector<IN> out) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/36950629/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
index 9c39e5f..116efd4 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
@@ -20,7 +20,7 @@ import org.apache.flink.python.api.streaming.util.SerializationUtils.Serializer;
 Utility function to serialize values, usually directly from data sources.
 */
 public class SerializerMap<IN> implements MapFunction<IN, byte[]> {
-	private Serializer<IN> serializer = null;
+	private transient Serializer<IN> serializer;
 
 	@Override
 	@SuppressWarnings("unchecked")


[6/7] flink git commit: [hotfix] [py] Code cleanup - SerializationUtils

Posted by ch...@apache.org.
[hotfix] [py] Code cleanup - SerializationUtils


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/154bb3bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/154bb3bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/154bb3bb

Branch: refs/heads/master
Commit: 154bb3bb09dcc6dbc3dd7e1a6984ff5afd41431b
Parents: 3695062
Author: zentol <ch...@apache.org>
Authored: Thu Apr 20 13:27:15 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200

----------------------------------------------------------------------
 .../api/streaming/plan/PythonPlanSender.java    |  1 -
 .../api/streaming/util/SerializationUtils.java  | 67 ++++++++++++--------
 2 files changed, 40 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/154bb3bb/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
index 8b8366a..331c67e 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
@@ -28,7 +28,6 @@ public class PythonPlanSender {
 		this.output = new DataOutputStream(output);
 	}
 
-	@SuppressWarnings("unchecked")
 	public void sendRecord(Object record) throws IOException {
 		byte[] data = SerializationUtils.getSerializer(record).serialize(record);
 		output.write(data);

http://git-wip-us.apache.org/repos/asf/flink/blob/154bb3bb/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
index f228327..721746a 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
@@ -20,21 +20,22 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
 public class SerializationUtils {
-	public static final byte TYPE_BOOLEAN = (byte) 34;
-	public static final byte TYPE_BYTE = (byte) 33;
-	public static final byte TYPE_INTEGER = (byte) 32;
-	public static final byte TYPE_LONG = (byte) 31;
-	public static final byte TYPE_DOUBLE = (byte) 30;
-	public static final byte TYPE_FLOAT = (byte) 29;
-	public static final byte TYPE_STRING = (byte) 28;
-	public static final byte TYPE_BYTES = (byte) 27;
-	public static final byte TYPE_NULL = (byte) 26;
+	public static final byte TYPE_BOOLEAN = 34;
+	public static final byte TYPE_BYTE = 33;
+	public static final byte TYPE_INTEGER = 32;
+	public static final byte TYPE_LONG = 31;
+	public static final byte TYPE_DOUBLE = 30;
+	public static final byte TYPE_FLOAT = 29;
+	public static final byte TYPE_STRING = 28;
+	public static final byte TYPE_BYTES = 27;
+	public static final byte TYPE_NULL = 26;
 
 	private enum SupportedTypes {
 		TUPLE, BOOLEAN, BYTE, BYTES, INTEGER, LONG, FLOAT, DOUBLE, STRING, NULL, CUSTOMTYPEWRAPPER
 	}
 
-	public static Serializer getSerializer(Object value) {
+	@SuppressWarnings("unchecked")
+	public static <IN> Serializer<IN> getSerializer(IN value) {
 		String className = value.getClass().getSimpleName().toUpperCase();
 		if (className.startsWith("TUPLE")) {
 			className = "TUPLE";
@@ -43,35 +44,48 @@ public class SerializationUtils {
 			className = "BYTES";
 		}
 		SupportedTypes type = SupportedTypes.valueOf(className);
+		Serializer<?> serializer;
 		switch (type) {
 			case TUPLE:
-				return new TupleSerializer((Tuple) value);
+				serializer = new TupleSerializer((Tuple) value);
+				break;
 			case BOOLEAN:
-				return new BooleanSerializer();
+				serializer = new BooleanSerializer();
+				break;
 			case BYTE:
-				return new ByteSerializer();
+				serializer = new ByteSerializer();
+				break;
 			case BYTES:
-				return new BytesSerializer();
+				serializer = new BytesSerializer();
+				break;
 			case INTEGER:
-				return new IntSerializer();
+				serializer = new IntSerializer();
+				break;
 			case LONG:
-				return new LongSerializer();
+				serializer = new LongSerializer();
+				break;
 			case STRING:
-				return new StringSerializer();
+				serializer = new StringSerializer();
+				break;
 			case FLOAT:
-				return new FloatSerializer();
+				serializer = new FloatSerializer();
+				break;
 			case DOUBLE:
-				return new DoubleSerializer();
+				serializer = new DoubleSerializer();
+				break;
 			case NULL:
-				return new NullSerializer();
+				serializer = new NullSerializer();
+				break;
 			case CUSTOMTYPEWRAPPER:
-				return new CustomTypeWrapperSerializer((CustomTypeWrapper) value);
+				serializer = new CustomTypeWrapperSerializer((CustomTypeWrapper) value);
+				break;
 			default:
 				throw new IllegalArgumentException("Unsupported Type encountered: " + type);
 		}
+		return (Serializer<IN>) serializer;
 	}
 
-	public static abstract class Serializer<IN> {
+	public abstract static class Serializer<IN> {
 		private byte[] typeInfo = null;
 
 		public byte[] serialize(IN value) {
@@ -130,7 +144,7 @@ public class SerializationUtils {
 	public static class BooleanSerializer extends Serializer<Boolean> {
 		@Override
 		public byte[] serializeWithoutTypeInfo(Boolean value) {
-			return new byte[]{value ? (byte) 1 : (byte) 0};
+			return new byte[]{(byte) (value ? 1 : 0)};
 		}
 
 		@Override
@@ -237,7 +251,7 @@ public class SerializationUtils {
 	}
 
 	public static class TupleSerializer extends Serializer<Tuple> {
-		private final Serializer[] serializer;
+		private final Serializer<Object>[] serializer;
 
 		public TupleSerializer(Tuple value) {
 			serializer = new Serializer[value.getArity()];
@@ -246,7 +260,6 @@ public class SerializationUtils {
 			}
 		}
 
-		@SuppressWarnings("unchecked")
 		@Override
 		public byte[] serializeWithoutTypeInfo(Tuple value) {
 			ArrayList<byte[]> bits = new ArrayList<>();
@@ -269,7 +282,7 @@ public class SerializationUtils {
 		@Override
 		public void putTypeInfo(ByteBuffer buffer) {
 			buffer.put((byte) serializer.length);
-			for (Serializer s : serializer) {
+			for (Serializer<Object> s : serializer) {
 				s.putTypeInfo(buffer);
 			}
 		}
@@ -277,7 +290,7 @@ public class SerializationUtils {
 		@Override
 		public int getTypeInfoSize() {
 			int size = 1;
-			for (Serializer s : serializer) {
+			for (Serializer<Object> s : serializer) {
 				size += s.getTypeInfoSize();
 			}
 			return size;


[3/7] flink git commit: [FLINK-3709] Prevent caching of outdated suspended ExecutionGraphs

Posted by ch...@apache.org.
[FLINK-3709] Prevent caching of outdated suspended ExecutionGraphs

This closes #3709.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49d0e432
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49d0e432
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49d0e432

Branch: refs/heads/master
Commit: 49d0e4321df1853d00e26593556c34acb9bed3d2
Parents: 6aa5bad
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Tue Apr 11 19:48:52 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu May 4 12:56:39 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/webmonitor/ExecutionGraphHolder.java      | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49d0e432/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 3d0cfc0..f9faa85 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 
 import org.slf4j.Logger;
@@ -67,14 +68,18 @@ public class ExecutionGraphHolder {
 	public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
 		AccessExecutionGraph cached = cache.get(jid);
 		if (cached != null) {
-			return cached;
+			if (cached.getState() == JobStatus.SUSPENDED) {
+				cache.remove(jid);
+			} else {
+				return cached;
+			}
 		}
 
 		try {
 			if (jobManager != null) {
 				Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
 				Object result = Await.result(future, timeout);
-				
+
 				if (result instanceof JobManagerMessages.JobNotFound) {
 					return null;
 				}