You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/20 16:49:06 UTC

[01/10] flink git commit: [FLINK-4669] [apis] Harmonize the instantiations of local environments between Java/Scala Batch/Streaming

Repository: flink
Updated Branches:
  refs/heads/master 1220230c6 -> ab2125b82


[FLINK-4669] [apis] Harmonize the instantiations of local environments between Java/Scala Batch/Streaming


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bc1063c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bc1063c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bc1063c

Branch: refs/heads/master
Commit: 9bc1063c609d6802e809040b2ed4746a7771a43d
Parents: 570ed4e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 20 12:15:09 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:16 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/ExecutionEnvironment.java    | 41 +++++++---
 .../flink/api/scala/ExecutionEnvironment.scala  | 78 ++++++++++++------
 .../environment/StreamExecutionEnvironment.java | 61 +++++++++-----
 .../api/scala/StreamExecutionEnvironment.scala  | 84 ++++++++++++--------
 .../BatchScalaAPICompletenessTest.scala         |  1 -
 5 files changed, 176 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9bc1063c/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 0ac9ccf..709ef09 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -78,6 +78,8 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The ExecutionEnvironment is the context in which a program is executed. A
  * {@link LocalEnvironment} will cause execution in the current JVM, a
@@ -1219,21 +1221,28 @@ public abstract class ExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a local execution environment with enable running web UI
+	 * Creates a {@link LocalEnvironment} for local program execution that also starts the
+	 * web monitoring UI.
+	 *
+	 * <p>The local execution environment will run the program in a multi-threaded fashion in
+	 * the same JVM as the environment was created in. It will use the parallelism specified in the
+	 * parameter.
 	 *
-	 * @return [[StreamExecutionEnvironment]]
+	 * <p>If the configuration key 'jobmanager.web.port' was set in the configuration, that particular
+	 * port will be used for the web UI. Otherwise, the default port (8081) will be used.
 	 */
-	public static ExecutionEnvironment createLocalEnvWithWebUI(Configuration conf) {
+	@PublicEvolving
+	public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
+		checkNotNull(conf, "conf");
+
 		if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
 			int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
 			conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
 		}
-
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+
 		LocalEnvironment localEnv = new LocalEnvironment(conf);
-		if (localEnv.getConfig().getParallelism() < 0) {
-			localEnv.setParallelism(defaultLocalDop);
-		}
+		localEnv.setParallelism(defaultLocalDop);
 
 		return localEnv;
 	}
@@ -1294,7 +1303,21 @@ public abstract class ExecutionEnvironment {
 		rec.setParallelism(parallelism);
 		return rec;
 	}
-	
+
+	// --------------------------------------------------------------------------------------------
+	//  Default parallelism for local execution
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Gets the default parallelism that will be used for the local execution environment created by
+	 * {@link #createLocalEnvironment()}.
+	 * 
+	 * @return The default local parallelism
+	 */
+	public static int getDefaultLocalParallelism() {
+		return defaultLocalDop;
+	}
+
 	/**
 	 * Sets the default parallelism that will be used for the local execution environment created by
 	 * {@link #createLocalEnvironment()}.
@@ -1304,7 +1327,7 @@ public abstract class ExecutionEnvironment {
 	public static void setDefaultLocalParallelism(int parallelism) {
 		defaultLocalDop = parallelism;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Methods to control the context environment and creation of explicit environments other
 	//  than the context environment

http://git-wip-us.apache.org/repos/asf/flink/blob/9bc1063c/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index a61c65d..6d70438 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo}
 import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv}
 import org.apache.flink.api.scala.hadoop.{mapred, mapreduce}
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.Path
 import org.apache.flink.types.StringValue
 import org.apache.flink.util.{NumberSequenceIterator, Preconditions, SplittableIterator}
@@ -715,6 +715,27 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
 object ExecutionEnvironment {
 
   /**
+   * Sets the default parallelism that will be used for the local execution
+   * environment created by [[createLocalEnvironment()]].
+   *
+   * @param parallelism The default parallelism to use for local execution.
+   */
+  @PublicEvolving
+  def setDefaultLocalParallelism(parallelism: Int) : Unit =
+    JavaEnv.setDefaultLocalParallelism(parallelism)
+
+  /**
+   * Gets the default parallelism that will be used for the local execution environment created by
+   * [[createLocalEnvironment()]].
+   */
+  @PublicEvolving
+  def getDefaultLocalParallelism: Int = JavaEnv.getDefaultLocalParallelism
+
+  // --------------------------------------------------------------------------
+  //  context environment
+  // --------------------------------------------------------------------------
+
+  /**
    * Creates an execution environment that represents the context in which the program is
    * currently executed. If the program is invoked standalone, this method returns a local
    * execution environment. If the program is invoked from within the command line client
@@ -724,17 +745,20 @@ object ExecutionEnvironment {
     new ExecutionEnvironment(JavaEnv.getExecutionEnvironment)
   }
 
+  // --------------------------------------------------------------------------
+  //  local environment
+  // --------------------------------------------------------------------------
+
   /**
-   * Creates a local execution environment. The local execution environment will run the program in
-   * a multi-threaded fashion in the same JVM as the environment was created in. The parallelism of
-   * the local environment is the number of hardware contexts (CPU cores/threads).
+   * Creates a local execution environment. The local execution environment will run the
+   * program in a multi-threaded fashion in the same JVM as the environment was created in.
+   *
+   * This method sets the environment's default parallelism to given parameter, which
+   * defaults to the value set via [[setDefaultLocalParallelism(Int)]].
    */
-  def createLocalEnvironment(
-      parallelism: Int = Runtime.getRuntime.availableProcessors())
-      : ExecutionEnvironment = {
-    val javaEnv = JavaEnv.createLocalEnvironment()
-    javaEnv.setParallelism(parallelism)
-    new ExecutionEnvironment(javaEnv)
+  def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism): 
+      ExecutionEnvironment = {
+    new ExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
   }
 
   /**
@@ -748,23 +772,23 @@ object ExecutionEnvironment {
   }
 
   /**
-   * Creates a local execution environment with enable running web UI
+   * Creates a [[ExecutionEnvironment]] for local program execution that also starts the
+   * web monitoring UI.
    *
-   * @return [[ExecutionEnvironment]]
+   * The local execution environment will run the program in a multi-threaded fashion in
+   * the same JVM as the environment was created in. It will use the parallelism specified in the
+   * parameter.
+   *
+   * If the configuration key 'jobmanager.web.port' was set in the configuration, that particular
+   * port will be used for the web UI. Otherwise, the default port (8081) will be used.
+   *
+   * @param config optional config for the local execution
+   * @return The created StreamExecutionEnvironment
    */
-  def createLocalEnvWithWebUI(confOps: Option[Configuration] = None): ExecutionEnvironment = {
-    val conf = confOps match {
-      case Some(cf) => cf
-      case None => new Configuration
-    }
-
-    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
-    if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
-      val port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT
-      conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port)
-    }
-
-    new ExecutionEnvironment(JavaEnv.createLocalEnvironment(conf))
+  @PublicEvolving
+  def createLocalEnvironmentWithWebUI(config: Configuration = null): ExecutionEnvironment = {
+    val conf: Configuration = if (config == null) new Configuration() else config
+    new ExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
   }
 
   /**
@@ -779,6 +803,10 @@ object ExecutionEnvironment {
     new ExecutionEnvironment(new CollectionEnvironment)
   }
 
+  // --------------------------------------------------------------------------
+  //  remote environment
+  // --------------------------------------------------------------------------
+
   /**
    * Creates a remote execution environment. The remote environment sends (parts of) the program to
    * a cluster for execution. Note that all file paths used in the program must be accessible from

http://git-wip-us.apache.org/repos/asf/flink/blob/9bc1063c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 6c35b09..5b4b901 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -82,6 +82,8 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The StreamExecutionEnvironment is the context in which a streaming program is executed. A
  * {@link LocalStreamEnvironment} will cause execution in the current JVM, a
@@ -504,18 +506,6 @@ public abstract class StreamExecutionEnvironment {
 		return config.getNumberOfExecutionRetries();
 	}
 
-	/**
-	 * Sets the default parallelism that will be used for the local execution
-	 * environment created by {@link #createLocalEnvironment()}.
-	 *
-	 * @param parallelism
-	 * 		The parallelism to use as the default local parallelism.
-	 */
-	@PublicEvolving
-	public static void setDefaultLocalParallelism(int parallelism) {
-		defaultLocalParallelism = parallelism;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	// Registry for types and serializers
 	// --------------------------------------------------------------------------------------------
@@ -1646,21 +1636,28 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a local execution environment with enable running web UI
-	 *
-	 * @return [[StreamExecutionEnvironment]]
+	 * Creates a {@link LocalStreamEnvironment} for local program execution that also starts the
+	 * web monitoring UI.
+	 * 
+	 * <p>The local execution environment will run the program in a multi-threaded fashion in
+	 * the same JVM as the environment was created in. It will use the parallelism specified in the
+	 * parameter.
+	 * 
+	 * <p>If the configuration key 'jobmanager.web.port' was set in the configuration, that particular
+	 * port will be used for the web UI. Otherwise, the default port (8081) will be used.
 	 */
-	public static StreamExecutionEnvironment createLocalEnvWithWebUI(Configuration conf) {
+	@PublicEvolving
+	public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
+		checkNotNull(conf, "conf");
+
 		if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
 			int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
 			conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
 		}
-
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+
 		LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);
-		if (localEnv.getConfig().getParallelism() < 0) {
-			localEnv.setParallelism(defaultLocalParallelism);
-		}
+		localEnv.setParallelism(defaultLocalParallelism);
 
 		return localEnv;
 	}
@@ -1745,7 +1742,29 @@ public abstract class StreamExecutionEnvironment {
 	{
 		return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles);
 	}
-	
+
+	/**
+	 * Gets the default parallelism that will be used for the local execution environment created by
+	 * {@link #createLocalEnvironment()}.
+	 *
+	 * @return The default local parallelism
+	 */
+	@PublicEvolving
+	public static int getDefaultLocalParallelism() {
+		return defaultLocalParallelism;
+	}
+
+	/**
+	 * Sets the default parallelism that will be used for the local execution
+	 * environment created by {@link #createLocalEnvironment()}.
+	 *
+	 * @param parallelism The parallelism to use as the default local parallelism.
+	 */
+	@PublicEvolving
+	public static void setDefaultLocalParallelism(int parallelism) {
+		defaultLocalParallelism = parallelism;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Methods to control the context and local environments for execution from packaged programs
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9bc1063c/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index d8aa1eb..22f1264 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -23,9 +23,10 @@ import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
 import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
 import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.ExecutionEnvironment
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.state.AbstractStateBackend
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source._
@@ -502,8 +503,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     * @param filter
     *          The files to be excluded from the processing
     * @return The data stream that represents the data read from the given file
-    *
-    * @deprecated Use [[FileInputFormat#setFilesFilter(FilePathFilter)]] to set a filter and
+   * @deprecated Use [[FileInputFormat#setFilesFilter(FilePathFilter)]] to set a filter and
     * [[StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)]]
     */
   @PublicEvolving
@@ -677,14 +677,24 @@ object StreamExecutionEnvironment {
    * Sets the default parallelism that will be used for the local execution
    * environment created by [[createLocalEnvironment()]].
    *
-   * @param parallelism
-   * The parallelism to use as the default local parallelism.
+   * @param parallelism The default parallelism to use for local execution.
    */
   @PublicEvolving
   def setDefaultLocalParallelism(parallelism: Int) : Unit =
-    StreamExecutionEnvironment.setDefaultLocalParallelism(parallelism)
+    JavaEnv.setDefaultLocalParallelism(parallelism)
 
   /**
+   * Gets the default parallelism that will be used for the local execution environment created by
+   * [[createLocalEnvironment()]].
+   */
+  @PublicEvolving
+  def getDefaultLocalParallelism: Int = JavaEnv.getDefaultLocalParallelism
+  
+  // --------------------------------------------------------------------------
+  //  context environment
+  // --------------------------------------------------------------------------
+  
+  /**
    * Creates an execution environment that represents the context in which the program is
    * currently executed. If the program is invoked standalone, this method returns a local
    * execution environment. If the program is invoked from within the command line client
@@ -694,39 +704,46 @@ object StreamExecutionEnvironment {
     new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
   }
 
+  // --------------------------------------------------------------------------
+  //  local environment
+  // --------------------------------------------------------------------------
+
   /**
-   * Creates a local execution environment. The local execution environment will run the program in
-   * a multi-threaded fashion in the same JVM as the environment was created in. The default degree
-   * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
+   * Creates a local execution environment. The local execution environment will run the
+   * program in a multi-threaded fashion in the same JVM as the environment was created in.
+   *
+   * This method sets the environment's default parallelism to given parameter, which
+   * defaults to the value set via [[setDefaultLocalParallelism(Int)]].
    */
-  def createLocalEnvironment(
-    parallelism: Int =  Runtime.getRuntime.availableProcessors()):
-  StreamExecutionEnvironment = {
+  def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism):
+      StreamExecutionEnvironment = {
     new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
   }
 
   /**
-   * Creates a local execution environment with enable running web UI
+   * Creates a [[StreamExecutionEnvironment]] for local program execution that also starts the
+   * web monitoring UI.
    *
-   * @param confOps optional config of Flink
-   * @return [[StreamExecutionEnvironment]]
+   * The local execution environment will run the program in a multi-threaded fashion in
+   * the same JVM as the environment was created in. It will use the parallelism specified in the
+   * parameter.
+   *
+   * If the configuration key 'jobmanager.web.port' was set in the configuration, that particular
+   * port will be used for the web UI. Otherwise, the default port (8081) will be used.
+   *
+   * @param config optional config for the local execution
+   * @return The created StreamExecutionEnvironment
    */
-  def createLocalEnvWithWebUI(confOps: Option[Configuration] = None): StreamExecutionEnvironment = {
-    val conf = confOps match {
-      case Some(cf) => cf
-      case None => new Configuration
-    }
-
-    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
-    if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
-      val port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT
-      conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port)
-    }
-
-    val parallelism = Runtime.getRuntime.availableProcessors()
-    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism, conf))
+  @PublicEvolving
+  def createLocalEnvironmentWithWebUI(config: Configuration = null): StreamExecutionEnvironment = {
+    val conf: Configuration = if (config == null) new Configuration() else config
+    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
   }
 
+  // --------------------------------------------------------------------------
+  //  remote environment
+  // --------------------------------------------------------------------------
+
   /**
    * Creates a remote execution environment. The remote environment sends (parts of) the program to
    * a cluster for execution. Note that all file paths used in the program must be accessible from
@@ -763,10 +780,11 @@ object StreamExecutionEnvironment {
    *                 provided in the JAR files.
    */
   def createRemoteEnvironment(
-    host: String,
-    port: Int,
-    parallelism: Int,
-    jarFiles: String*): StreamExecutionEnvironment = {
+      host: String,
+      port: Int,
+      parallelism: Int,
+      jarFiles: String*): StreamExecutionEnvironment = {
+
     val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
     javaEnv.setParallelism(parallelism)
     new StreamExecutionEnvironment(javaEnv)

http://git-wip-us.apache.org/repos/asf/flink/blob/9bc1063c/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
index 36ded06..76e8547 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
@@ -55,7 +55,6 @@ class BatchScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
        "org.apache.flink.api.java.operators.TwoInputOperator.getInput2Type",
        "org.apache.flink.api.java.ExecutionEnvironment.areExplicitEnvironmentsAllowed",
        "org.apache.flink.api.java.ExecutionEnvironment.resetContextEnvironment",
-       "org.apache.flink.api.java.ExecutionEnvironment.setDefaultLocalParallelism",
 
        // TypeHints are only needed for Java API, Scala API doesn't need them
        "org.apache.flink.api.java.operators.SingleInputUdfOperator.returns",


[09/10] flink git commit: [hotfix] [tests] Speed up CoStreamCheckpointingITCase

Posted by se...@apache.org.
[hotfix] [tests] Speed up CoStreamCheckpointingITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a26b0f08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a26b0f08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a26b0f08

Branch: refs/heads/master
Commit: a26b0f0826dd84cded650280f3b8262e628814da
Parents: 7280df4
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 20 14:53:13 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:44 2016 +0100

----------------------------------------------------------------------
 .../CoStreamCheckpointingITCase.java            | 172 +++++++++++--------
 1 file changed, 104 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a26b0f08/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index c503a1f..51a00b9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -21,17 +21,23 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.Collector;
 
+import org.junit.Test;
+
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
@@ -50,10 +56,11 @@ import static org.junit.Assert.assertTrue;
  * The test triggers a failure after a while and verifies that, after completion, the
  * state reflects the "exactly once" semantics.
  */
-@SuppressWarnings("serial")
-public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
+@SuppressWarnings({"serial", "deprecation"})
+public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBase {
 
-	final long NUM_STRINGS = 10_000_000L;
+	private static final long NUM_STRINGS = 10_000L;
+	private static final int PARALLELISM = 4;
 
 	/**
 	 * Runs the following program:
@@ -62,12 +69,16 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	 *     [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
 	 * </pre>
 	 */
-	@Override
-	public void testProgram(StreamExecutionEnvironment env) {
-
+	@Test
+	public void testCoStreamCheckpointingProgram() throws Exception {
 		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
 
-		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+		env.enableCheckpointing(50);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
+
+		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS, NUM_STRINGS / 5));
 
 		stream
 				// -------------- first vertex, chained to the source ----------------
@@ -91,10 +102,11 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 						// Do nothing here
 					}
 				});
-	}
 
-	@Override
-	public void postSubmit() {
+		TestUtils.tryExecute(env, "Fault Tolerance Test");
+
+		// validate the result
+
 		long filterSum = 0;
 		for (long l : StringRichFilterFunction.counts) {
 			filterSum += l;
@@ -115,14 +127,6 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 			countSum += l;
 		}
 
-		if (!StringPrefixCountRichMapFunction.restoreCalledAtLeastOnce) {
-			System.err.println("Test inconclusive: Restore was never called on counting Map function.");
-		}
-
-		if (!LeftIdentityCoRichFlatMapFunction.restoreCalledAtLeastOnce) {
-			System.err.println("Test inconclusive: Restore was never called on counting CoMap function.");
-		}
-
 		// verify that we counted exactly right
 		assertEquals(NUM_STRINGS, filterSum);
 		assertEquals(NUM_STRINGS, coMapSum);
@@ -135,41 +139,42 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	//  Custom Functions
 	// --------------------------------------------------------------------------------------------
 
-	private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
-			implements ParallelSourceFunction<String>, Checkpointed<Integer> {
+	/**
+	 * A generating source that is slow before the first two checkpoints went through
+	 * and will indefinitely stall at a certain point to allow the checkpoint to complete.
+	 * 
+	 * After the checkpoints are through, it continues with full speed.
+	 */
+	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+			implements Checkpointed<Integer>, CheckpointListener {
 
-		static final long[] counts = new long[PARALLELISM];
-		
-		private final long numElements;
+		private static volatile int numCompletedCheckpoints = 0;
 
-		private Random rnd;
-		private StringBuilder stringBuilder;
+		private final long numElements;
+		private final long checkpointLatestAt;
 
-		private int index;
-		private int step;
+		private int index = -1;
 
 		private volatile boolean isRunning = true;
 
-		
-		StringGeneratingSourceFunction(long numElements) {
+		StringGeneratingSourceFunction(long numElements, long checkpointLatestAt) {
 			this.numElements = numElements;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws IOException {
-			rnd = new Random();
-			stringBuilder = new StringBuilder();
-			
-			step = getRuntimeContext().getNumberOfParallelSubtasks();
-			if (index == 0) {
-				index = getRuntimeContext().getIndexOfThisSubtask();
-			}
+			this.checkpointLatestAt = checkpointLatestAt;
 		}
 
 		@Override
 		public void run(SourceContext<String> ctx) throws Exception {
+			final Random rnd = new Random();
+			final StringBuilder stringBuilder = new StringBuilder();
+
 			final Object lockingObject = ctx.getCheckpointLock();
 
+			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+			if (index < 0) {
+				// not been restored, so initialize
+				index =getRuntimeContext().getIndexOfThisSubtask();
+			}
+
 			while (isRunning && index < numElements) {
 				char first = (char) ((index % 40) + 40);
 
@@ -178,23 +183,49 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 				String result = randomString(stringBuilder, rnd);
 
+				//noinspection SynchronizationOnLocalVariableOrMethodParameter
 				synchronized (lockingObject) {
 					index += step;
 					ctx.collect(result);
 				}
+
+				if (numCompletedCheckpoints < 2) {
+					// not yet completed enough checkpoints, so slow down
+					if (index < checkpointLatestAt) {
+						// mild slow down
+						Thread.sleep(1);
+					} else {
+						// wait until the checkpoints are completed
+						while (isRunning && numCompletedCheckpoints < 2) {
+							Thread.sleep(5);
+						}
+					}
+				}
 			}
 		}
 
 		@Override
-		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
-		}
-		
-		@Override
 		public void cancel() {
 			isRunning = false;
 		}
 
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+				numCompletedCheckpoints++;
+			}
+		}
+
 		private static String randomString(StringBuilder bld, Random rnd) {
 			final int len = rnd.nextInt(10) + 5;
 
@@ -205,16 +236,6 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 			return bld.toString();
 		}
-
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
-		}
-
-		@Override
-		public void restoreState(Integer state) {
-			index = state;
-		}
 	}
 
 	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> 
@@ -284,9 +305,10 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 	private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
 
-		Long count = 0L;
 		static final long[] counts = new long[PARALLELISM];
 
+		private long count = 0L;
+
 		@Override
 		public boolean filter(String value) {
 			count++;
@@ -310,10 +332,9 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	}
 
 	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements Checkpointed<Long> {
-		
+
 		static final long[] counts = new long[PARALLELISM];
-		static volatile boolean restoreCalledAtLeastOnce = false;
-		
+
 		private long count;
 
 		@Override
@@ -329,11 +350,7 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 		@Override
 		public void restoreState(Long state) {
-			restoreCalledAtLeastOnce = true;
 			count = state;
-			if (count == 0) {
-				throw new RuntimeException("Restore from beginning");
-			}
 		}
 
 		@Override
@@ -345,8 +362,7 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements Checkpointed<Long> {
 
 		static final long[] counts = new long[PARALLELISM];
-		static volatile boolean restoreCalledAtLeastOnce = false;
-		
+
 		private long count;
 
 		@Override
@@ -368,7 +384,6 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 		@Override
 		public void restoreState(Long state) {
-			restoreCalledAtLeastOnce = true;
 			count = state;
 		}
 
@@ -377,4 +392,25 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
 		}
 	}
+
+	public static class PrefixCount implements Serializable {
+
+		public String prefix;
+		public String value;
+		public long count;
+
+		@SuppressWarnings("unused")
+		public PrefixCount() {}
+
+		public PrefixCount(String prefix, String value, long count) {
+			this.prefix = prefix;
+			this.value = value;
+			this.count = count;
+		}
+
+		@Override
+		public String toString() {
+			return prefix + " / " + value;
+		}
+	}
 }


[03/10] flink git commit: [FLINK-5367] [docs] Restored changes that were lost when merging the recent doc refactoring.

Posted by se...@apache.org.
[FLINK-5367] [docs] Restored changes that were lost when merging the recent doc refactoring.

This closes #3028


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e42a173a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e42a173a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e42a173a

Branch: refs/heads/master
Commit: e42a173a62eea6318ad78b11db430d7f34af1711
Parents: 1220230
Author: David Anderson <da...@alpinegizmo.com>
Authored: Mon Dec 19 16:26:58 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:16 2016 +0100

----------------------------------------------------------------------
 docs/dev/datastream_api.md          |   2 +-
 docs/dev/execution_configuration.md |   4 +-
 docs/dev/restart_strategies.md      | 258 +++++++++++++++++++++++++++++
 docs/dev/state.md                   |   7 +-
 docs/redirects/fault_tolerance.md   |   2 +-
 docs/setup/config.md                |   2 +-
 docs/setup/fault_tolerance.md       | 271 -------------------------------
 7 files changed, 267 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e42a173a/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 85866f7..850d8c5 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -1623,7 +1623,7 @@ for an explanation of most parameters. These parameters pertain specifically to
 
 ### Fault Tolerance
 
-The [Fault Tolerance Documentation]({{ site.baseurl }}/setup/fault_tolerance.html) describes the options and parameters to enable and configure Flink's checkpointing mechanism.
+[State & Checkpointing]({{ site.baseurl }}/dev/state#enabling-checkpointing) describes how to enable and configure Flink's checkpointing mechanism. 
 
 ### Controlling Latency
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e42a173a/docs/dev/execution_configuration.md
----------------------------------------------------------------------
diff --git a/docs/dev/execution_configuration.md b/docs/dev/execution_configuration.md
index 1f66058..50a9f76 100644
--- a/docs/dev/execution_configuration.md
+++ b/docs/dev/execution_configuration.md
@@ -49,9 +49,9 @@ With the closure cleaner disabled, it might happen that an anonymous user functi
 
 - `getMaxParallelism()` / `setMaxParallelism(int parallelism)` Set the default maximum parallelism for the job. This setting determines the maximum degree of parallelism and specifies the upper limit for dynamic scaling.
 
-- `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int numberOfExecutionRetries)` Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of `-1` indicates that the system default value (as defined in the configuration) should be used.
+- `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int numberOfExecutionRetries)` Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of `-1` indicates that the system default value (as defined in the configuration) should be used. This is deprecated, use [restart strategies]({{ site.baseurl }}/dev/restart_strategies) instead.
 
-- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more.
+- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more. This is deprecated, use [restart strategies]({{ site.baseurl }}/dev/restart_strategies) instead.
 
 - `getExecutionMode()` / `setExecutionMode()`. The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e42a173a/docs/dev/restart_strategies.md
----------------------------------------------------------------------
diff --git a/docs/dev/restart_strategies.md b/docs/dev/restart_strategies.md
new file mode 100644
index 0000000..5965157
--- /dev/null
+++ b/docs/dev/restart_strategies.md
@@ -0,0 +1,258 @@
+---
+title: "Restart Strategies"
+nav-parent_id: execution
+nav-pos: 50
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink supports different restart strategies which control how the jobs are restarted in case of a failure.
+The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined.
+In case that the job is submitted with a restart strategy, this strategy overrides the cluster's default setting.
+
+The default restart strategy is set via Flink's configuration file `flink-conf.yaml`.
+The configuration parameter *restart-strategy* defines which strategy is taken.
+Per default, the no-restart strategy is used.
+When checkpointing is activated and no restart strategy has been configured, the job will be restarted infinitely often.
+See the following list of available restart strategies to learn what values are supported.
+
+Each restart strategy comes with its own set of parameters which control its behaviour.
+These values are also set in the configuration file.
+The description of each restart strategy contains more information about the respective configuration values.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 50%">Restart Strategy</th>
+      <th class="text-left">Value for restart-strategy</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>Fixed delay</td>
+        <td>fixed-delay</td>
+    </tr>
+    <tr>
+        <td>Failure rate</td>
+        <td>failure-rate</td>
+    </tr>
+    <tr>
+        <td>No restart</td>
+        <td>none</td>
+    </tr>
+  </tbody>
+</table>
+
+Apart from defining a default restart strategy, it is possible to define for each Flink job a specific restart strategy.
+This restart strategy is set programmatically by calling the `setRestartStrategy` method on the `ExecutionEnvironment`.
+Note that this also works for the `StreamExecutionEnvironment`.
+
+The following example shows how we can set a fixed delay restart strategy for our job.
+In case of a failure the system tries to restart the job 3 times and waits 10 seconds in-between successive restart attempts.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+  3, // number of restart attempts
+  Time.of(10, TimeUnit.SECONDS) // delay
+));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+  3, // number of restart attempts
+  Time.of(10, TimeUnit.SECONDS) // delay
+))
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+## Fixed Delay Restart Strategy
+
+The fixed delay restart strategy attempts a given number of times to restart the job.
+If the maximum number of attempts is exceeded, the job eventually fails.
+In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
+
+This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
+
+~~~
+restart-strategy: fixed-delay
+~~~
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Configuration Parameter</th>
+      <th class="text-left" style="width: 40%">Description</th>
+      <th class="text-left">Default Value</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><it>restart-strategy.fixed-delay.attempts</it></td>
+        <td>The number of times that Flink retries the execution before the job is declared as failed.</td>
+        <td>1</td>
+    </tr>
+    <tr>
+        <td><it>restart-strategy.fixed-delay.delay</it></td>
+        <td>Delaying the retry means that after a failed execution, the re-execution does not start immediately, but only after a certain delay. Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted.</td>
+        <td><it>akka.ask.timeout</it></td>
+    </tr>
+  </tbody>
+</table>
+
+~~~
+restart-strategy.fixed-delay.attempts: 3
+restart-strategy.fixed-delay.delay: 10 s
+~~~
+
+The fixed delay restart strategy can also be set programmatically:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+  3, // number of restart attempts
+  Time.of(10, TimeUnit.SECONDS) // delay
+));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+  3, // number of restart attempts
+  Time.of(10, TimeUnit.SECONDS) // delay
+))
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+## Failure Rate Restart Strategy
+
+The failure rate restart strategy restarts job after failure, but when `failure rate` (failures per time interval) is exceeded, the job eventually fails.
+In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
+
+This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
+
+~~~
+restart-strategy: failure-rate
+~~~
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Configuration Parameter</th>
+      <th class="text-left" style="width: 40%">Description</th>
+      <th class="text-left">Default Value</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><it>restart-strategy.failure-rate.max-failures-per-interval</it></td>
+        <td>Maximum number of restarts in given time interval before failing a job</td>
+        <td>1</td>
+    </tr>
+    <tr>
+        <td><it>restart-strategy.failure-rate.failure-rate-interval</it></td>
+        <td>Time interval for measuring failure rate.</td>
+        <td>1 minute</td>
+    </tr>
+    <tr>
+        <td><it>restart-strategy.failure-rate.delay</it></td>
+        <td>Delay between two consecutive restart attempts</td>
+        <td><it>akka.ask.timeout</it></td>
+    </tr>
+  </tbody>
+</table>
+
+~~~
+restart-strategy.failure-rate.max-failures-per-interval: 3
+restart-strategy.failure-rate.failure-rate-interval: 5 min
+restart-strategy.failure-rate.delay: 10 s
+~~~
+
+The failure rate restart strategy can also be set programmatically:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.failureRateRestart(
+  3, // max failures per interval
+  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
+  Time.of(10, TimeUnit.SECONDS) // delay
+));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.failureRateRestart(
+  3, // max failures per unit
+  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
+  Time.of(10, TimeUnit.SECONDS) // delay
+))
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+## No Restart Strategy
+
+The job fails directly and no restart is attempted.
+
+~~~
+restart-strategy: none
+~~~
+
+The no restart strategy can also be set programmatically:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.noRestart());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.noRestart())
+{% endhighlight %}
+</div>
+</div>
+
+## Fallback Restart Strategy
+
+The cluster defined restart strategy is used. 
+This helpful for streaming programs which enable checkpointing.
+Per default, a fixed delay restart strategy is chosen if there is no other restart strategy defined.
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/e42a173a/docs/dev/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/state.md b/docs/dev/state.md
index 6ed20ae..a772a03 100644
--- a/docs/dev/state.md
+++ b/docs/dev/state.md
@@ -60,9 +60,6 @@ By default, checkpointing is disabled. To enable checkpointing, call `enableChec
 
 Other parameters for checkpointing include:
 
-- *Number of retries*: The `setNumberOfExecutionRerties()` method defines how many times the job is restarted after a failure.
-  When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
-
 - *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels.
   Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
 
@@ -358,3 +355,7 @@ Flink currently only provides processing guarantees for jobs without iterations.
 Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.
 
 {% top %}
+
+## Restart Strategies
+
+Flink supports different restart strategies which control how the jobs are restarted in case of a failure. For more information, see [Restart Strategies]({{ site.baseurl }}/dev/restart_strategies).

http://git-wip-us.apache.org/repos/asf/flink/blob/e42a173a/docs/redirects/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/redirects/fault_tolerance.md b/docs/redirects/fault_tolerance.md
index 4d82187..62944d4 100644
--- a/docs/redirects/fault_tolerance.md
+++ b/docs/redirects/fault_tolerance.md
@@ -1,7 +1,7 @@
 ---
 title: "Fault Tolerance"
 layout: redirect
-redirect: /setup/fault_tolerance.html
+redirect: /dev/state.html
 permalink: /apis/streaming/fault_tolerance.html
 ---
 <!--

http://git-wip-us.apache.org/repos/asf/flink/blob/e42a173a/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 680f4f7..5c13e43 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -158,7 +158,7 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `state.backend.rocksdb.checkpointdir`:  The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example \u2018:\u2019 (colon) on Linux/Unix). (DEFAULT value is `taskmanager.tmp.dirs`)
 
-- `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/setup/fault_tolerance.md#externalized-checkpoints).
+- `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/setup/checkpoints#externalized-checkpoints).
 
 - `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e42a173a/docs/setup/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/setup/fault_tolerance.md b/docs/setup/fault_tolerance.md
deleted file mode 100644
index 500e91a..0000000
--- a/docs/setup/fault_tolerance.md
+++ /dev/null
@@ -1,271 +0,0 @@
----
-title: "Restart Strategies"
-nav-parent_id: execution
-nav-pos: 50
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Flink supports different restart strategies which control how the jobs are restarted in case of a failure.
-The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined.
-In case that the job is submitted with a restart strategy, this strategy overrides the cluster's default setting.
-
-The default restart strategy is set via Flink's configuration file `flink-conf.yaml`.
-The configuration parameter *restart-strategy* defines which strategy is taken.
-Per default, the no-restart strategy is used.
-See the following list of available restart strategies to learn what values are supported.
-
-Each restart strategy comes with its own set of parameters which control its behaviour.
-These values are also set in the configuration file.
-The description of each restart strategy contains more information about the respective configuration values.
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 50%">Restart Strategy</th>
-      <th class="text-left">Value for restart-strategy</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>Fixed delay</td>
-        <td>fixed-delay</td>
-    </tr>
-    <tr>
-        <td>Failure rate</td>
-        <td>failure-rate</td>
-    </tr>
-    <tr>
-        <td>No restart</td>
-        <td>none</td>
-    </tr>
-  </tbody>
-</table>
-
-Apart from defining a default restart strategy, it is possible to define for each Flink job a specific restart strategy.
-This restart strategy is set programmatically by calling the `setRestartStrategy` method on the `ExecutionEnvironment`.
-Note that this also works for the `StreamExecutionEnvironment`.
-
-The following example shows how we can set a fixed delay restart strategy for our job.
-In case of a failure the system tries to restart the job 3 times and waits 10 seconds in-between successive restart attempts.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts
-  Time.of(10, TimeUnit.SECONDS) // delay
-));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts
-  Time.of(10, TimeUnit.SECONDS) // delay
-))
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-## Fixed Delay Restart Strategy
-
-The fixed delay restart strategy attempts a given number of times to restart the job.
-If the maximum number of attempts is exceeded, the job eventually fails.
-In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
-
-This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
-
-~~~
-restart-strategy: fixed-delay
-~~~
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 40%">Configuration Parameter</th>
-      <th class="text-left" style="width: 40%">Description</th>
-      <th class="text-left">Default Value</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td><it>restart-strategy.fixed-delay.attempts</it></td>
-        <td>Number of restart attempts</td>
-        <td>1</td>
-    </tr>
-    <tr>
-        <td><it>restart-strategy.fixed-delay.delay</it></td>
-        <td>Delay between two consecutive restart attempts</td>
-        <td><it>akka.ask.timeout</it></td>
-    </tr>
-  </tbody>
-</table>
-
-~~~
-restart-strategy.fixed-delay.attempts: 3
-restart-strategy.fixed-delay.delay: 10 s
-~~~
-
-The fixed delay restart strategy can also be set programmatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts
-  Time.of(10, TimeUnit.SECONDS) // delay
-));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts
-  Time.of(10, TimeUnit.SECONDS) // delay
-))
-{% endhighlight %}
-</div>
-</div>
-
-### Restart Attempts
-
-The number of times that Flink retries the execution before the job is declared as failed is configurable via the *restart-strategy.fixed-delay.attempts* parameter.
-
-The default value is **1**.
-
-### Retry Delays
-
-Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start immediately, but only after a certain delay.
-
-Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted.
-
-The default value is the value of *akka.ask.timeout*.
-
-{% top %}
-
-## Failure Rate Restart Strategy
-
-The failure rate restart strategy restarts job after failure, but when `failure rate` (failures per time interval) is exceeded, the job eventually fails.
-In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
-
-This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
-
-~~~
-restart-strategy: failure-rate
-~~~
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 40%">Configuration Parameter</th>
-      <th class="text-left" style="width: 40%">Description</th>
-      <th class="text-left">Default Value</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td><it>restart-strategy.failure-rate.max-failures-per-interval</it></td>
-        <td>Maximum number of restarts in given time interval before failing a job</td>
-        <td>1</td>
-    </tr>
-    <tr>
-        <td><it>restart-strategy.failure-rate.failure-rate-interval</it></td>
-        <td>Time interval for measuring failure rate.</td>
-        <td>1 minute</td>
-    </tr>
-    <tr>
-        <td><it>restart-strategy.failure-rate.delay</it></td>
-        <td>Delay between two consecutive restart attempts</td>
-        <td><it>akka.ask.timeout</it></td>
-    </tr>
-  </tbody>
-</table>
-
-~~~
-restart-strategy.failure-rate.max-failures-per-interval: 3
-restart-strategy.failure-rate.failure-rate-interval: 5 min
-restart-strategy.failure-rate.delay: 10 s
-~~~
-
-The failure rate restart strategy can also be set programmatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.failureRateRestart(
-  3, // max failures per interval
-  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
-  Time.of(10, TimeUnit.SECONDS) // delay
-));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.failureRateRestart(
-  3, // max failures per unit
-  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
-  Time.of(10, TimeUnit.SECONDS) // delay
-))
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-## No Restart Strategy
-
-The job fails directly and no restart is attempted.
-
-~~~
-restart-strategy: none
-~~~
-
-The no restart strategy can also be set programmatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.noRestart());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.noRestart())
-{% endhighlight %}
-</div>
-</div>
-
-## Fallback Restart Strategy
-
-The cluster defined restart strategy is used. 
-This helpful for streaming programs which enable checkpointing.
-Per default, a fixed delay restart strategy is chosen if there is no other restart strategy defined.
-
-{% top %}


[10/10] flink git commit: [FLINK-4973] Let LatencyMarksEmitter use StreamTask's ProcessingTimeService

Posted by se...@apache.org.
[FLINK-4973] Let LatencyMarksEmitter use StreamTask's ProcessingTimeService

The LatencyMarksEmitter class uses now the StreamTask's ProcessingTimeService to schedule
latency mark emission. For that the ProcessingTimeService was extended to have the method
scheduleAtFixedRate to schedule repeated tasks. The latency mark emission is such a repeated
task.

This closes #3008.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab2125b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab2125b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab2125b8

Branch: refs/heads/master
Commit: ab2125b82ed10389dafecf1712efc1f8fb977c11
Parents: a26b0f0
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 14 14:53:11 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:03:01 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/operators/StreamSource.java   |  48 +++--
 .../runtime/tasks/ProcessingTimeService.java    |  10 +
 .../tasks/SystemProcessingTimeService.java      |  68 +++++++
 .../tasks/TestProcessingTimeService.java        | 156 +++++++++------
 .../operators/HeapInternalTimerServiceTest.java |  36 ++--
 .../operators/StreamSourceOperatorTest.java     |  85 ++++++---
 .../TestProcessingTimeServiceTest.java          |   6 +-
 .../tasks/SystemProcessingTimeServiceTest.java  | 188 ++++++++++++++++++-
 8 files changed, 474 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 5a16db0..84330b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -23,11 +23,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 /**
  * {@link StreamOperator} for streaming sources.
@@ -62,8 +61,12 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 
 		LatencyMarksEmitter latencyEmitter = null;
 		if(getExecutionConfig().isLatencyTrackingEnabled()) {
-			latencyEmitter = new LatencyMarksEmitter<>(lockingObject, collector, getExecutionConfig().getLatencyTrackingInterval(),
-					getOperatorConfig().getVertexID(), getRuntimeContext().getIndexOfThisSubtask());
+			latencyEmitter = new LatencyMarksEmitter<>(
+				getProcessingTimeService(),
+				collector,
+				getExecutionConfig().getLatencyTrackingInterval(),
+				getOperatorConfig().getVertexID(),
+				getRuntimeContext().getIndexOfThisSubtask());
 		}
 		
 		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
@@ -121,28 +124,35 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 	}
 
 	private static class LatencyMarksEmitter<OUT> {
-		private final ScheduledExecutorService scheduleExecutor;
 		private final ScheduledFuture<?> latencyMarkTimer;
 
-		public LatencyMarksEmitter(final Object lockingObject, final Output<StreamRecord<OUT>> output, long latencyTrackingInterval, final int vertexID, final int subtaskIndex) {
-			this.scheduleExecutor = Executors.newScheduledThreadPool(1);
-			this.latencyMarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
-				@Override
-				public void run() {
-					try {
-						synchronized (lockingObject) {
-							output.emitLatencyMarker(new LatencyMarker(System.currentTimeMillis(), vertexID, subtaskIndex));
+		public LatencyMarksEmitter(
+				final ProcessingTimeService processingTimeService,
+				final Output<StreamRecord<OUT>> output,
+				long latencyTrackingInterval,
+				final int vertexID,
+				final int subtaskIndex) {
+
+			latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
+				new ProcessingTimeCallback() {
+					@Override
+					public void onProcessingTime(long timestamp) throws Exception {
+						try {
+							// ProcessingTimeService callbacks are executed under the checkpointing lock
+							output.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex));
+						} catch (Throwable t) {
+							// we catch the Throwables here so that we don't trigger the processing
+							// timer services async exception handler
+							LOG.warn("Error while emitting latency marker.", t);
 						}
-					} catch (Throwable t) {
-						LOG.warn("Error while emitting latency marker", t);
 					}
-				}
-			}, 0, latencyTrackingInterval, TimeUnit.MILLISECONDS);
+				},
+				0L,
+				latencyTrackingInterval);
 		}
 
 		public void close() {
 			latencyMarkTimer.cancel(true);
-			scheduleExecutor.shutdownNow();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index f64bead..240aba8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -56,6 +56,16 @@ public abstract class ProcessingTimeService {
 	public abstract ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target);
 
 	/**
+	 * Registers a task to be executed repeatedly at a fixed rate.
+	 *
+	 * @param callback to be executed after the initial delay and then after each period
+	 * @param initialDelay initial delay to start executing callback
+	 * @param period after the initial delay after which the callback is executed
+	 * @return Scheduled future representing the task to be executed repeatedly
+	 */
+	public abstract ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period);
+
+	/**
 	 * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise.
 	 */
 	public abstract boolean isTerminated();

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 071dbce..abcb19b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 import java.util.concurrent.BlockingQueue;
@@ -124,6 +125,33 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
 	}
 
 	@Override
+	public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
+		long nextTimestamp = getCurrentProcessingTime() + initialDelay;
+
+		// we directly try to register the timer and only react to the status on exception
+		// that way we save unnecessary volatile accesses for each timer
+		try {
+			return timerService.scheduleAtFixedRate(
+				new RepeatedTriggerTask(task, checkpointLock, callback, nextTimestamp, period),
+				initialDelay,
+				period,
+				TimeUnit.MILLISECONDS);
+		} catch (RejectedExecutionException e) {
+			final int status = this.status.get();
+			if (status == STATUS_QUIESCED) {
+				return new NeverCompleteFuture(initialDelay);
+			}
+			else if (status == STATUS_SHUTDOWN) {
+				throw new IllegalStateException("Timer service is shut down");
+			}
+			else {
+				// something else happened, so propagate the exception
+				throw e;
+			}
+		}
+	}
+
+	@Override
 	public boolean isTerminated() {
 		return status.get() == STATUS_SHUTDOWN;
 	}
@@ -196,6 +224,46 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
 		}
 	}
 
+	/**
+	 * Internal task which is repeatedly called by the processing time service.
+	 */
+	private static final class RepeatedTriggerTask implements Runnable {
+		private final Object lock;
+		private final ProcessingTimeCallback target;
+		private final long period;
+		private final AsyncExceptionHandler exceptionHandler;
+
+		private long nextTimestamp;
+
+		private RepeatedTriggerTask(
+				AsyncExceptionHandler exceptionHandler,
+				Object lock,
+				ProcessingTimeCallback target,
+				long nextTimestamp,
+				long period) {
+			this.lock = Preconditions.checkNotNull(lock);
+			this.target = Preconditions.checkNotNull(target);
+			this.period = period;
+			this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);
+
+			this.nextTimestamp = nextTimestamp;
+		}
+
+		@Override
+		public void run() {
+			try {
+				synchronized (lock) {
+					target.onProcessingTime(nextTimestamp);
+				}
+
+				nextTimestamp += period;
+			} catch (Throwable t) {
+				TimerException asyncException = new TimerException(t);
+				exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class NeverCompleteFuture implements ScheduledFuture<Object> {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 2ca287a..3c33ad3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -17,18 +17,19 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.util.ArrayList;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * This is a {@link ProcessingTimeService} used <b>strictly for testing</b> the
@@ -36,38 +37,38 @@ import java.util.concurrent.TimeoutException;
  * */
 public class TestProcessingTimeService extends ProcessingTimeService {
 
-	private volatile long currentTime = 0;
+	private volatile long currentTime = 0L;
 
 	private volatile boolean isTerminated;
 	private volatile boolean isQuiesced;
 
 	// sorts the timers by timestamp so that they are processed in the correct order.
-	private final Map<Long, List<ScheduledTimerFuture>> registeredTasks = new TreeMap<>();
+	private final PriorityQueue<Tuple2<Long, CallbackTask>> priorityQueue;
 
+	public TestProcessingTimeService() {
+		this.priorityQueue = new PriorityQueue<>(16, new Comparator<Tuple2<Long, CallbackTask>>() {
+			@Override
+			public int compare(Tuple2<Long, CallbackTask> o1, Tuple2<Long, CallbackTask> o2) {
+				return Long.compare(o1.f0, o2.f0);
+			}
+		});
+	}
 	
 	public void setCurrentTime(long timestamp) throws Exception {
 		this.currentTime = timestamp;
 
 		if (!isQuiesced) {
-			// decide which timers to fire and put them in a list
-			// we do not fire them here to be able to accommodate timers
-			// that register other timers.
-	
-			Iterator<Map.Entry<Long, List<ScheduledTimerFuture>>> it = registeredTasks.entrySet().iterator();
-			List<Map.Entry<Long, List<ScheduledTimerFuture>>> toRun = new ArrayList<>();
-			while (it.hasNext()) {
-				Map.Entry<Long, List<ScheduledTimerFuture>> t = it.next();
-				if (t.getKey() <= this.currentTime) {
-					toRun.add(t);
-					it.remove();
-				}
-			}
-	
-			// now do the actual firing.
-			for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: toRun) {
-				long now = tasks.getKey();
-				for (ScheduledTimerFuture task: tasks.getValue()) {
-					task.getProcessingTimeCallback().onProcessingTime(now);
+			while (!priorityQueue.isEmpty() && currentTime >= priorityQueue.peek().f0) {
+				Tuple2<Long, CallbackTask> entry = priorityQueue.poll();
+
+				CallbackTask callbackTask = entry.f1;
+
+				if (!callbackTask.isDone()) {
+					callbackTask.onProcessingTime(entry.f0);
+
+					if (callbackTask instanceof PeriodicCallbackTask) {
+						priorityQueue.offer(Tuple2.of(((PeriodicCallbackTask)callbackTask).nextTimestamp(entry.f0), callbackTask));
+					}
 				}
 			}
 		}
@@ -84,27 +85,38 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 			throw new IllegalStateException("terminated");
 		}
 		if (isQuiesced) {
-			return new ScheduledTimerFuture(null, -1);
+			return new CallbackTask(null);
 		}
 
+		CallbackTask callbackTask = new CallbackTask(target);
+
 		if (timestamp <= currentTime) {
 			try {
-				target.onProcessingTime(timestamp);
+				callbackTask.onProcessingTime(timestamp);
 			} catch (Exception e) {
 				throw new RuntimeException(e);
 			}
+		} else {
+			priorityQueue.offer(Tuple2.of(timestamp, callbackTask));
 		}
 
-		ScheduledTimerFuture result = new ScheduledTimerFuture(target, timestamp);
+		return callbackTask;
+	}
 
-		List<ScheduledTimerFuture> tasks = registeredTasks.get(timestamp);
-		if (tasks == null) {
-			tasks = new ArrayList<>();
-			registeredTasks.put(timestamp, tasks);
+	@Override
+	public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
+		if (isTerminated) {
+			throw new IllegalStateException("terminated");
 		}
-		tasks.add(result);
+		if (isQuiesced) {
+			return new CallbackTask(null);
+		}
+
+		PeriodicCallbackTask periodicCallbackTask = new PeriodicCallbackTask(callback, period);
 
-		return result;
+		priorityQueue.offer(Tuple2.<Long, CallbackTask>of(currentTime + initialDelay, periodicCallbackTask));
+
+		return periodicCallbackTask;
 	}
 
 	@Override
@@ -116,7 +128,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 	public void quiesceAndAwaitPending() {
 		if (!isTerminated) {
 			isQuiesced = true;
-			registeredTasks.clear();
+			priorityQueue.clear();
 		}
 	}
 
@@ -125,35 +137,46 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 		this.isTerminated = true;
 	}
 
-	public int getNumRegisteredTimers() {
+	public int getNumActiveTimers() {
 		int count = 0;
-		for (List<ScheduledTimerFuture> tasks: registeredTasks.values()) {
-			count += tasks.size();
+
+		for (Tuple2<Long, CallbackTask> entry : priorityQueue) {
+			if (!entry.f1.isDone()) {
+				count++;
+			}
 		}
+
 		return count;
 	}
 
-	public Set<Long> getRegisteredTimerTimestamps() {
+	public Set<Long> getActiveTimerTimestamps() {
 		Set<Long> actualTimestamps = new HashSet<>();
-		for (List<ScheduledTimerFuture> timerFutures : registeredTasks.values()) {
-			for (ScheduledTimerFuture timer : timerFutures) {
-				actualTimestamps.add(timer.getTimestamp());
+
+		for (Tuple2<Long, CallbackTask> entry : priorityQueue) {
+			if (!entry.f1.isDone()) {
+				actualTimestamps.add(entry.f0);
 			}
 		}
+
 		return actualTimestamps;
 	}
 
 	// ------------------------------------------------------------------------
 
-	private class ScheduledTimerFuture implements ScheduledFuture<Object> {
+	private static class CallbackTask implements ScheduledFuture<Object> {
 
-		private final ProcessingTimeCallback processingTimeCallback;
+		protected final ProcessingTimeCallback processingTimeCallback;
 
-		private final long timestamp;
+		private AtomicReference<CallbackTaskState> state = new AtomicReference<>(CallbackTaskState.CREATED);
 
-		public ScheduledTimerFuture(ProcessingTimeCallback processingTimeCallback, long timestamp) {
+		private CallbackTask(ProcessingTimeCallback processingTimeCallback) {
 			this.processingTimeCallback = processingTimeCallback;
-			this.timestamp = timestamp;
+		}
+
+		public void onProcessingTime(long timestamp) throws Exception {
+			processingTimeCallback.onProcessingTime(timestamp);
+
+			state.compareAndSet(CallbackTaskState.CREATED, CallbackTaskState.DONE);
 		}
 
 		@Override
@@ -168,21 +191,17 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 
 		@Override
 		public boolean cancel(boolean mayInterruptIfRunning) {
-			List<ScheduledTimerFuture> scheduledTimerFutures = registeredTasks.get(timestamp);
-			if (scheduledTimerFutures != null) {
-				scheduledTimerFutures.remove(this);
-			}
-			return true;
+			return state.compareAndSet(CallbackTaskState.CREATED, CallbackTaskState.CANCELLED);
 		}
 
 		@Override
 		public boolean isCancelled() {
-			throw new UnsupportedOperationException();
+			return state.get() == CallbackTaskState.CANCELLED;
 		}
 
 		@Override
 		public boolean isDone() {
-			throw new UnsupportedOperationException();
+			return state.get() != CallbackTaskState.CREATED;
 		}
 
 		@Override
@@ -195,12 +214,31 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 			throw new UnsupportedOperationException();
 		}
 
-		public ProcessingTimeCallback getProcessingTimeCallback() {
-			return processingTimeCallback;
+		enum CallbackTaskState {
+			CREATED,
+			CANCELLED,
+			DONE
+		}
+	}
+
+	private static class PeriodicCallbackTask extends CallbackTask {
+
+		private final long period;
+
+		private PeriodicCallbackTask(ProcessingTimeCallback processingTimeCallback, long period) {
+			super(processingTimeCallback);
+			Preconditions.checkArgument(period > 0L, "The period must be greater than 0.");
+
+			this.period = period;
+		}
+
+		@Override
+		public void onProcessingTime(long timestamp) throws Exception {
+			processingTimeCallback.onProcessingTime(timestamp);
 		}
 
-		public long getTimestamp() {
-			return timestamp;
+		public long nextTimestamp(long currentTimestamp) {
+			return currentTimestamp + period;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index d753e4e..680f2ac 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -176,8 +176,8 @@ public class HeapInternalTimerServiceTest {
 		assertEquals(2, timerService.numProcessingTimeTimers("hello"));
 		assertEquals(3, timerService.numProcessingTimeTimers("ciao"));
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(10L));
 
 		processingTimeService.setCurrentTime(10);
 
@@ -185,8 +185,8 @@ public class HeapInternalTimerServiceTest {
 		assertEquals(1, timerService.numProcessingTimeTimers("hello"));
 		assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(20L));
 
 		processingTimeService.setCurrentTime(20);
 
@@ -194,18 +194,18 @@ public class HeapInternalTimerServiceTest {
 		assertEquals(0, timerService.numProcessingTimeTimers("hello"));
 		assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(30L));
 
 		processingTimeService.setCurrentTime(30);
 
 		assertEquals(0, timerService.numProcessingTimeTimers());
 
-		assertEquals(0, processingTimeService.getNumRegisteredTimers());
+		assertEquals(0, processingTimeService.getNumActiveTimers());
 
 		timerService.registerProcessingTimeTimer("ciao", 40);
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
+		assertEquals(1, processingTimeService.getNumActiveTimers());
 	}
 
 	/**
@@ -233,15 +233,15 @@ public class HeapInternalTimerServiceTest {
 
 		assertEquals(1, timerService.numProcessingTimeTimers());
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(20L));
 
 		timerService.registerProcessingTimeTimer("ciao", 10);
 
 		assertEquals(2, timerService.numProcessingTimeTimers());
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(10L));
 	}
 
 	/**
@@ -266,8 +266,8 @@ public class HeapInternalTimerServiceTest {
 
 		assertEquals(1, timerService.numProcessingTimeTimers());
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(10L));
 
 		doAnswer(new Answer<Object>() {
 			@Override
@@ -279,8 +279,8 @@ public class HeapInternalTimerServiceTest {
 
 		processingTimeService.setCurrentTime(10);
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(20L));
 
 		doAnswer(new Answer<Object>() {
 			@Override
@@ -294,8 +294,8 @@ public class HeapInternalTimerServiceTest {
 
 		assertEquals(1, timerService.numProcessingTimeTimers());
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(30L));
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index e600420..b153de9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -47,6 +46,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -181,42 +181,52 @@ public class StreamSourceOperatorTest {
 	 */
 	@Test
 	public void testLatencyMarkEmission() throws Exception {
-		final long now = System.currentTimeMillis();
-
 		final List<StreamElement> output = new ArrayList<>();
 
+		final long maxProcessingTime = 100L;
+		final long latencyMarkInterval = 10L;
+
+		final TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
+		testProcessingTimeService.setCurrentTime(0L);
+		final List<Long> processingTimes = Arrays.asList(1L, 10L, 11L, 21L, maxProcessingTime);
+
 		// regular stream source operator
-		final StoppableStreamSource<String, InfiniteSource<String>> operator =
-				new StoppableStreamSource<>(new InfiniteSource<String>());
+		final StreamSource<Long, ProcessingTimeServiceSource> operator =
+				new StreamSource<>(new ProcessingTimeServiceSource(testProcessingTimeService, processingTimes));
 
 		// emit latency marks every 10 milliseconds.
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 10);
-
-		// trigger an async cancel in a bit
-		new Thread("canceler") {
-			@Override
-			public void run() {
-				try {
-					Thread.sleep(200);
-				} catch (InterruptedException ignored) {}
-				operator.stop();
-			}
-		}.start();
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, latencyMarkInterval, testProcessingTimeService);
 
 		// run and wait to be stopped
-		operator.run(new Object(), new CollectorOutput<String>(output));
+		operator.run(new Object(), new CollectorOutput<Long>(output));
+
+		int numberLatencyMarkers = (int) (maxProcessingTime / latencyMarkInterval) + 1;
+
+		assertEquals(
+			numberLatencyMarkers + 1, // + 1 is the final watermark element
+			output.size());
 
-		// ensure that there has been some output
-		assertTrue(output.size() > 0);
-		// and that its only latency markers
-		for(StreamElement se: output) {
+		long timestamp = 0L;
+
+		int i = 0;
+		// and that its only latency markers + a final watermark
+		for (; i < output.size() - 1; i++) {
+			StreamElement se = output.get(i);
 			Assert.assertTrue(se.isLatencyMarker());
 			Assert.assertEquals(-1, se.asLatencyMarker().getVertexID());
 			Assert.assertEquals(0, se.asLatencyMarker().getSubtaskIndex());
-			Assert.assertTrue(se.asLatencyMarker().getMarkedTime() >= now);
+			Assert.assertTrue(se.asLatencyMarker().getMarkedTime() == timestamp);
+
+			timestamp += latencyMarkInterval;
 		}
+
+		Assert.assertTrue(output.get(i).isWatermark());
 	}
 
+	@Test
+	public void testLatencyMarksEmitterLifecycleIntegration() {
+
+	}
 
 	@Test
 	public void testAutomaticWatermarkContext() throws Exception {
@@ -341,4 +351,33 @@ public class StreamSourceOperatorTest {
 			running = false;
 		}
 	}
+
+	private static final class ProcessingTimeServiceSource implements SourceFunction<Long> {
+
+		private final TestProcessingTimeService processingTimeService;
+		private final List<Long> processingTimes;
+
+		private boolean cancelled = false;
+
+		private ProcessingTimeServiceSource(TestProcessingTimeService processingTimeService, List<Long> processingTimes) {
+			this.processingTimeService = processingTimeService;
+			this.processingTimes = processingTimes;
+		}
+
+		@Override
+		public void run(SourceContext<Long> ctx) throws Exception {
+			for (Long processingTime : processingTimes) {
+				if (cancelled) {
+					break;
+				}
+
+				processingTimeService.setCurrentTime(processingTime);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			cancelled = true;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index cd1f253..2903758 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -76,13 +76,13 @@ public class TestProcessingTimeServiceTest {
 			}
 		});
 
-		assertEquals(2, tp.getNumRegisteredTimers());
+		assertEquals(2, tp.getNumActiveTimers());
 
 		tp.setCurrentTime(35);
-		assertEquals(1, tp.getNumRegisteredTimers());
+		assertEquals(1, tp.getNumActiveTimers());
 
 		tp.setCurrentTime(40);
-		assertEquals(0, tp.getNumRegisteredTimers());
+		assertEquals(0, tp.getNumActiveTimers());
 
 		tp.shutdownService();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 766b313..50e438c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -21,8 +21,10 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -34,7 +36,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class SystemProcessingTimeServiceTest {
+public class SystemProcessingTimeServiceTest extends TestLogger {
 
 	@Test
 	public void testTriggerHoldsLock() throws Exception {
@@ -70,6 +72,134 @@ public class SystemProcessingTimeServiceTest {
 		}
 	}
 
+	/**
+	 * Tests that the schedule at fixed rate callback is called under the given lock
+	 */
+	@Test
+	public void testScheduleAtFixedRateHoldsLock() throws Exception {
+
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+			new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		final OneShotLatch awaitCallback = new OneShotLatch();
+
+		try {
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			// schedule something
+			ScheduledFuture<?> future = timer.scheduleAtFixedRate(
+				new ProcessingTimeCallback() {
+					@Override
+					public void onProcessingTime(long timestamp) {
+						assertTrue(Thread.holdsLock(lock));
+
+						awaitCallback.trigger();
+					}
+				},
+				0L,
+				100L);
+
+			// wait until the first execution is active
+			awaitCallback.await();
+
+			// cancel periodic callback
+			future.cancel(true);
+
+			// check that no asynchronous error was reported
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+		}
+		finally {
+			timer.shutdownService();
+		}
+	}
+
+	/**
+	 * Tests that SystemProcessingTimeService#scheduleAtFixedRate is actually triggered multiple
+	 * times.
+	 */
+	@Test(timeout=10000)
+	public void testScheduleAtFixedRate() throws Exception {
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+		final long period = 10L;
+		final int countDown = 3;
+
+		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+			new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		final CountDownLatch countDownLatch = new CountDownLatch(countDown);
+
+		try {
+			timer.scheduleAtFixedRate(new ProcessingTimeCallback() {
+				@Override
+				public void onProcessingTime(long timestamp) throws Exception {
+					countDownLatch.countDown();
+				}
+			}, 0L, period);
+
+			countDownLatch.await();
+
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+
+		} finally {
+			timer.shutdownService();
+		}
+	}
+
+	/**
+	 * Tests that shutting down the SystemProcessingTimeService will also cancel the scheduled at
+	 * fix rate future.
+	 */
+	@Test
+	public void testQuiesceAndAwaitingCancelsScheduledAtFixRateFuture() throws Exception {
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+		final long period = 10L;
+
+		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+			new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		try {
+			ScheduledFuture<?> scheduledFuture = timer.scheduleAtFixedRate(new ProcessingTimeCallback() {
+				@Override
+				public void onProcessingTime(long timestamp) throws Exception {
+				}
+			}, 0L, period);
+
+			assertFalse(scheduledFuture.isDone());
+
+			// this should cancel our future
+			timer.quiesceAndAwaitPending();
+
+			assertTrue(scheduledFuture.isCancelled());
+
+			scheduledFuture = timer.scheduleAtFixedRate(new ProcessingTimeCallback() {
+				@Override
+				public void onProcessingTime(long timestamp) throws Exception {
+					throw new Exception("Test exception.");
+				}
+			}, 0L, 100L);
+
+			assertNotNull(scheduledFuture);
+
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+
+		} finally {
+			timer.shutdownService();
+		}
+	}
+
 	@Test
 	public void testImmediateShutdown() throws Exception {
 
@@ -114,6 +244,21 @@ public class SystemProcessingTimeServiceTest {
 				// expected
 			}
 
+			try {
+				timer.scheduleAtFixedRate(
+					new ProcessingTimeCallback() {
+						@Override
+						public void onProcessingTime(long timestamp) {}
+					},
+					0L,
+					100L);
+
+				fail("should result in an exception");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
+
 			// obviously, we have an asynchronous interrupted exception
 			assertNotNull(errorRef.get());
 			assertTrue(errorRef.get().getCause() instanceof InterruptedException);
@@ -206,6 +351,18 @@ public class SystemProcessingTimeServiceTest {
 
 			assertEquals(0, timer.getNumTasksScheduled());
 
+			future = timer.scheduleAtFixedRate(
+				new ProcessingTimeCallback() {
+					@Override
+					public void onProcessingTime(long timestamp) throws Exception {}
+				}, 10000000000L, 50L);
+
+			assertEquals(1, timer.getNumTasksScheduled());
+
+			future.cancel(false);
+
+			assertEquals(0, timer.getNumTasksScheduled());
+
 			// check that no asynchronous error was reported
 			if (errorRef.get() != null) {
 				throw new Exception(errorRef.get());
@@ -241,4 +398,33 @@ public class SystemProcessingTimeServiceTest {
 		latch.await();
 		assertTrue(exceptionWasThrown.get());
 	}
+
+	@Test
+	public void testExceptionReportingScheduleAtFixedRate() throws InterruptedException {
+		final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
+		final OneShotLatch latch = new OneShotLatch();
+		final Object lock = new Object();
+
+		ProcessingTimeService timeServiceProvider = new SystemProcessingTimeService(
+			new AsyncExceptionHandler() {
+				@Override
+				public void handleAsyncException(String message, Throwable exception) {
+					exceptionWasThrown.set(true);
+					latch.trigger();
+				}
+			}, lock);
+
+		timeServiceProvider.scheduleAtFixedRate(
+			new ProcessingTimeCallback() {
+			@Override
+			public void onProcessingTime(long timestamp) throws Exception {
+				throw new Exception("Exception in Timer");
+			}
+		},
+			0L,
+			100L	);
+
+		latch.await();
+		assertTrue(exceptionWasThrown.get());
+	}
 }


[04/10] flink git commit: [FLINK-4861] [build] Package optional project artifacts

Posted by se...@apache.org.
[FLINK-4861] [build] Package optional project artifacts

Package the Flink connectors, metrics, and libraries into subdirectories
of a new opt directory in the release/snapshot tarballs.

This closes #3014


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c76baa1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c76baa1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c76baa1

Branch: refs/heads/master
Commit: 5c76baa1734303a01472afd17cfaf3442eb06c43
Parents: cefb8db
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Dec 15 15:49:07 2016 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:43 2016 +0100

----------------------------------------------------------------------
 flink-dist/pom.xml                        |  80 +++++++++++++++++++
 flink-dist/src/main/assemblies/opt.xml    | 106 +++++++++++++++++++++++++
 flink-libraries/flink-gelly-scala/pom.xml |  34 ++++++--
 flink-libraries/flink-ml/pom.xml          |  19 +++++
 4 files changed, 231 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c76baa1/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 25dc708..6e9edf1 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -154,6 +154,72 @@ under the License.
 			<artifactId>log4j</artifactId>
 			<scope>compile</scope>
 		</dependency>
+
+		<!-- the following dependencies are packaged in opt/ -->
+
+		<!-- start optional Flink metrics reporters -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-dropwizard</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-ganglia</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-graphite</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-statsd</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<!-- end optional Flink metrics reporters -->
+
+		<!-- start optional Flink libraries -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-cep_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-cep-scala_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-gelly_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-gelly-scala_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-gelly-examples_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-ml_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<!-- end optional Flink libraries -->
 	</dependencies>
 
 	<profiles>
@@ -293,6 +359,20 @@ under the License.
 							<appendAssemblyId>false</appendAssemblyId>
 						</configuration>
 					</execution>
+					<execution>
+						<id>opt</id>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<descriptors>
+								<descriptor>src/main/assemblies/opt.xml</descriptor>
+							</descriptors>
+							<finalName>flink-${project.version}-bin</finalName>
+							<appendAssemblyId>false</appendAssemblyId>
+						</configuration>
+					</execution>
 				</executions>
 			</plugin>
 			<plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/5c76baa1/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
new file mode 100644
index 0000000..c6dc307
--- /dev/null
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -0,0 +1,106 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly
+		xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+		xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>opt</id>
+	<formats>
+		<format>dir</format>
+	</formats>
+
+	<includeBaseDirectory>true</includeBaseDirectory>
+	<baseDirectory>flink-${project.version}</baseDirectory>
+
+	<files>
+		<!-- CEP -->
+		<file>
+			<source>../flink-libraries/flink-cep/target/flink-cep_2.10-${project.version}.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-cep_2.10-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<file>
+			<source>../flink-libraries/flink-cep-scala/target/flink-cep-scala_2.10-${project.version}.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-cep-scala_2.10-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<!-- Gelly -->
+		<file>
+			<source>../flink-libraries/flink-gelly/target/flink-gelly_2.10-${project.version}.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-gelly_2.10-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<file>
+			<source>../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-${project.version}.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-gelly-examples_2.10-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<file>
+			<source>../flink-libraries/flink-gelly-scala/target/flink-gelly-scala_2.10-${project.version}-jar-with-dependencies.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-gelly-scala_2.10-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<!-- ML -->
+		<file>
+			<source>../flink-libraries/flink-ml/target/flink-ml_2.10-${project.version}-jar-with-dependencies.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-ml_2.10-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<!-- Metrics -->
+		<file>
+			<source>../flink-metrics/flink-metrics-dropwizard/target/flink-metrics-dropwizard-${project.version}-jar-with-dependencies.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-metrics-dropwizard-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<file>
+			<source>../flink-metrics/flink-metrics-ganglia/target/flink-metrics-ganglia-${project.version}-jar-with-dependencies.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-metrics-ganglia-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<file>
+			<source>../flink-metrics/flink-metrics-graphite/target/flink-metrics-graphite-${project.version}-jar-with-dependencies.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-metrics-graphite-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<file>
+			<source>../flink-metrics/flink-metrics-statsd/target/flink-metrics-statsd-${project.version}.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-metrics-statsd-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+	</files>
+</assembly>

http://git-wip-us.apache.org/repos/asf/flink/blob/5c76baa1/flink-libraries/flink-gelly-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/pom.xml b/flink-libraries/flink-gelly-scala/pom.xml
index 69a7bfb..ee4549c 100644
--- a/flink-libraries/flink-gelly-scala/pom.xml
+++ b/flink-libraries/flink-gelly-scala/pom.xml
@@ -56,14 +56,6 @@ under the License.
             <version>${project.version}</version>
         </dependency>
 
-        <!-- We need to add this explicitly because through shading the dependency on asm seems
-        to go away. -->
-        <dependency>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm</artifactId>
-            <version>${asm.version}</version>
-        </dependency>
-
         <!-- the dependencies below are already provided in Flink -->
 
         <dependency>
@@ -83,6 +75,13 @@ under the License.
             <artifactId>scala-compiler</artifactId>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm</artifactId>
+            <version>${asm.version}</version>
+            <scope>provided</scope>
+        </dependency>
         
         <!-- test dependencies -->
         
@@ -216,6 +215,25 @@ under the License.
 					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
 				</configuration>
 			</plugin>
+
+            <!-- build a har-with-dependencies, to be included in the 'opt' build folder -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5c76baa1/flink-libraries/flink-ml/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml
index c1a9a39..e6b8530 100644
--- a/flink-libraries/flink-ml/pom.xml
+++ b/flink-libraries/flink-ml/pom.xml
@@ -181,6 +181,25 @@
 					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
 				</configuration>
 			</plugin>
+
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.4</version>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 </project>


[08/10] flink git commit: [hotfix] [tests] Clean up some warnings

Posted by se...@apache.org.
[hotfix] [tests] Clean up some warnings


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a6585d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a6585d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a6585d9

Branch: refs/heads/master
Commit: 3a6585d9ef15337ed1f1cf7d90102ba6b6141580
Parents: 862e347
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 20 12:27:20 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:44 2016 +0100

----------------------------------------------------------------------
 .../operators/StreamOperatorChainingTest.java   |  2 --
 .../flink/core/testutils/OneShotLatch.java      |  4 +--
 .../src/test/resources/log4j-test.properties    | 27 ++++++++++++++++
 .../src/test/resources/logback-test.xml         | 34 ++++++++++++++++++++
 4 files changed, 63 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3a6585d9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index 4e405fd..c95a85e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -142,8 +142,6 @@ public class StreamOperatorChainingTest {
 
 		StreamConfig streamConfig = new StreamConfig(configuration);
 
-		System.out.println(streamConfig);
-
 		StreamMap<Integer, Integer> headOperator =
 				streamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a6585d9/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index d2eeb04..1afe952 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -45,7 +45,7 @@ public final class OneShotLatch {
 	}
 
 	/**
-	 * Waits until {@link #trigger())} is called. Once {@code #trigger()} has been called this
+	 * Waits until {@link OneShotLatch#trigger())} is called. Once {@code trigger()} has been called this
 	 * call will always return immediately.
 	 * 
 	 * @throws InterruptedException Thrown if the thread is interrupted while waiting.
@@ -59,7 +59,7 @@ public final class OneShotLatch {
 	}
 
 	/**
-	 * Waits until {@link #trigger())} is called. Once {@code #trigger()} has been called this
+	 * Waits until {@link OneShotLatch#trigger())} is called. Once {@code #trigger()} has been called this
 	 * call will always return immediately.
 	 * 
 	 * <p>If the latch is not triggered within the given timeout, a {@code TimeoutException}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a6585d9/flink-test-utils-parent/flink-test-utils-junit/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/test/resources/log4j-test.properties b/flink-test-utils-parent/flink-test-utils-junit/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..4c74d85
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/3a6585d9/flink-test-utils-parent/flink-test-utils-junit/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/test/resources/logback-test.xml b/flink-test-utils-parent/flink-test-utils-junit/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1c4ea08
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    
+    <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+    <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
\ No newline at end of file


[02/10] flink git commit: [FLINK-4669] [apis] Add createLocalEnvironment() utility method that starts the web UI

Posted by se...@apache.org.
[FLINK-4669] [apis] Add createLocalEnvironment() utility method that starts the web UI

This closes #2541


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/570ed4e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/570ed4e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/570ed4e2

Branch: refs/heads/master
Commit: 570ed4e23a22381374c7a009f7cd0d743699891c
Parents: e42a173
Author: shijinkui <sh...@huawei.com>
Authored: Tue Nov 29 12:56:25 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:16 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/ExecutionEnvironment.java    | 23 ++++++++++++++++++-
 .../flink/api/scala/ExecutionEnvironment.scala  | 24 ++++++++++++++++++--
 .../environment/StreamExecutionEnvironment.java | 21 +++++++++++++++++
 .../api/scala/StreamExecutionEnvironment.scala  | 23 +++++++++++++++++++
 4 files changed, 88 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/570ed4e2/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 964eed1..0ac9ccf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -53,6 +53,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
@@ -1216,7 +1217,27 @@ public abstract class ExecutionEnvironment {
 	public static LocalEnvironment createLocalEnvironment(Configuration customConfiguration) {
 		return new LocalEnvironment(customConfiguration);
 	}
-	
+
+	/**
+	 * Creates a local execution environment with enable running web UI
+	 *
+	 * @return [[StreamExecutionEnvironment]]
+	 */
+	public static ExecutionEnvironment createLocalEnvWithWebUI(Configuration conf) {
+		if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+			int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
+			conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
+		}
+
+		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+		LocalEnvironment localEnv = new LocalEnvironment(conf);
+		if (localEnv.getConfig().getParallelism() < 0) {
+			localEnv.setParallelism(defaultLocalDop);
+		}
+
+		return localEnv;
+	}
+
 	/**
 	 * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program 
 	 * to a cluster for execution. Note that all file paths used in the program must be accessible from the

http://git-wip-us.apache.org/repos/asf/flink/blob/570ed4e2/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 18aab07..a61c65d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo}
 import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv}
 import org.apache.flink.api.scala.hadoop.{mapred, mapreduce}
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.types.StringValue
 import org.apache.flink.util.{NumberSequenceIterator, Preconditions, SplittableIterator}
@@ -748,10 +748,30 @@ object ExecutionEnvironment {
   }
 
   /**
+   * Creates a local execution environment with enable running web UI
+   *
+   * @return [[ExecutionEnvironment]]
+   */
+  def createLocalEnvWithWebUI(confOps: Option[Configuration] = None): ExecutionEnvironment = {
+    val conf = confOps match {
+      case Some(cf) => cf
+      case None => new Configuration
+    }
+
+    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
+    if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+      val port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT
+      conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port)
+    }
+
+    new ExecutionEnvironment(JavaEnv.createLocalEnvironment(conf))
+  }
+
+  /**
    * Creates an execution environment that uses Java Collections underneath. This will execute in a
    * single thread in the current JVM. It is very fast but will fail if the data does not fit into
    * memory. This is useful during implementation and for debugging.
- *
+   *
    * @return
    */
   @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/570ed4e2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 99784e9..6c35b09 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -44,6 +44,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -1645,6 +1646,26 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Creates a local execution environment with enable running web UI
+	 *
+	 * @return [[StreamExecutionEnvironment]]
+	 */
+	public static StreamExecutionEnvironment createLocalEnvWithWebUI(Configuration conf) {
+		if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+			int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
+			conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
+		}
+
+		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+		LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);
+		if (localEnv.getConfig().getParallelism() < 0) {
+			localEnv.setParallelism(defaultLocalParallelism);
+		}
+
+		return localEnv;
+	}
+
+	/**
 	 * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
 	 * (parts of) the program to a cluster for execution. Note that all file
 	 * paths used in the program must be accessible from the cluster. The

http://git-wip-us.apache.org/repos/asf/flink/blob/570ed4e2/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 432e8ac..d8aa1eb 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStra
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.state.AbstractStateBackend
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source._
@@ -705,6 +706,28 @@ object StreamExecutionEnvironment {
   }
 
   /**
+   * Creates a local execution environment with enable running web UI
+   *
+   * @param confOps optional config of Flink
+   * @return [[StreamExecutionEnvironment]]
+   */
+  def createLocalEnvWithWebUI(confOps: Option[Configuration] = None): StreamExecutionEnvironment = {
+    val conf = confOps match {
+      case Some(cf) => cf
+      case None => new Configuration
+    }
+
+    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
+    if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+      val port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT
+      conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port)
+    }
+
+    val parallelism = Runtime.getRuntime.availableProcessors()
+    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism, conf))
+  }
+
+  /**
    * Creates a remote execution environment. The remote environment sends (parts of) the program to
    * a cluster for execution. Note that all file paths used in the program must be accessible from
    * the cluster. The execution will use the cluster's default parallelism, unless the


[06/10] flink git commit: [FLINK-4861] [build] Make sure 'opt' artifacts are not in the 'flink-dist' uber jar

Posted by se...@apache.org.
[FLINK-4861] [build] Make sure 'opt' artifacts are not in the 'flink-dist' uber jar

This also removes gelly-examples from 'opt' assembly


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/862e347a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/862e347a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/862e347a

Branch: refs/heads/master
Commit: 862e347a8db37eab979b908969730c8eeb1dab2e
Parents: 5c76baa
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Dec 19 21:50:31 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:43 2016 +0100

----------------------------------------------------------------------
 flink-dist/pom.xml                     | 22 ++++++++++++++++++++--
 flink-dist/src/main/assemblies/opt.xml |  7 -------
 2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/862e347a/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 6e9edf1..1af0775 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -155,31 +155,39 @@ under the License.
 			<scope>compile</scope>
 		</dependency>
 
-		<!-- the following dependencies are packaged in opt/ -->
+		<!--
+			The following dependencies are packaged in 'opt/' 
+			The scope of these dependencies needs to be 'provided' so that
+			they are not included into the 'flink-dist' uber jar.
+		-->
 
 		<!-- start optional Flink metrics reporters -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-metrics-dropwizard</artifactId>
 			<version>${project.version}</version>
+			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-metrics-ganglia</artifactId>
 			<version>${project.version}</version>
+			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-metrics-graphite</artifactId>
 			<version>${project.version}</version>
+			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-metrics-statsd</artifactId>
 			<version>${project.version}</version>
+			<scope>provided</scope>
 		</dependency>
 		<!-- end optional Flink metrics reporters -->
 
@@ -188,36 +196,42 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-cep_2.10</artifactId>
 			<version>${project.version}</version>
+			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-cep-scala_2.10</artifactId>
 			<version>${project.version}</version>
+			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-gelly_2.10</artifactId>
 			<version>${project.version}</version>
+			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-gelly-scala_2.10</artifactId>
 			<version>${project.version}</version>
+			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-gelly-examples_2.10</artifactId>
 			<version>${project.version}</version>
+			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-ml_2.10</artifactId>
 			<version>${project.version}</version>
+			<scope>provided</scope>
 		</dependency>
 		<!-- end optional Flink libraries -->
 	</dependencies>
@@ -277,6 +291,8 @@ under the License.
 
 	<build>
 		<plugins>
+
+			<!-- binary compatibility checks -->
 			<plugin>
 				<groupId>com.github.siom79.japicmp</groupId>
 				<artifactId>japicmp-maven-plugin</artifactId>
@@ -284,8 +300,9 @@ under the License.
 					<skip>true</skip>
 				</configuration>
 			</plugin>
+
+			<!--Build uber jar-->
 			<plugin>
-				<!--Build uber jar-->
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-shade-plugin</artifactId>
 				<executions>
@@ -375,6 +392,7 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-deploy-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/862e347a/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index c6dc307..3622ece 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -53,13 +53,6 @@
 		</file>
 
 		<file>
-			<source>../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-${project.version}.jar</source>
-			<outputDirectory>opt/</outputDirectory>
-			<destName>flink-gelly-examples_2.10-${project.version}.jar</destName>
-			<fileMode>0644</fileMode>
-		</file>
-
-		<file>
 			<source>../flink-libraries/flink-gelly-scala/target/flink-gelly-scala_2.10-${project.version}-jar-with-dependencies.jar</source>
 			<outputDirectory>opt/</outputDirectory>
 			<destName>flink-gelly-scala_2.10-${project.version}.jar</destName>


[07/10] flink git commit: [hotfix] [tests] Fix instability in TimestampITCase

Posted by se...@apache.org.
[hotfix] [tests] Fix instability in TimestampITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7280df48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7280df48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7280df48

Branch: refs/heads/master
Commit: 7280df4896939d72e83e67d9b2a2de7449f03ba3
Parents: 3a6585d
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 20 14:49:31 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:44 2016 +0100

----------------------------------------------------------------------
 .../test/streaming/runtime/TimestampITCase.java | 21 ++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7280df48/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 59bab7d..13add4b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -204,16 +204,29 @@ public class TimestampITCase extends TestLogger {
 					// try until we get the running jobs
 					List<JobID> running;
 					while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
-						Thread.sleep(50);
+						Thread.sleep(10);
 					}
 
 					JobID id = running.get(0);
 					
 					// send stop until the job is stopped
 					do {
-						cluster.stopJob(id);
-						Thread.sleep(50);
-					} while (!cluster.getCurrentlyRunningJobsJava().isEmpty());
+						try {
+							cluster.stopJob(id);
+						}
+						catch (Exception e) {
+							if (e.getCause() instanceof IllegalStateException) {
+								// this means the job is not yet ready to be stopped,
+								// for example because it is still in CREATED state
+								// we ignore the exception 
+							} else {
+								// other problem
+								throw e;
+							}
+						}
+						Thread.sleep(10);
+					}
+					while (!cluster.getCurrentlyRunningJobsJava().isEmpty());
 				}
 				catch (Throwable t) {
 					t.printStackTrace();


[05/10] flink git commit: [FLINK-5369] [build] Rework jsr305 and logging dependencies.

Posted by se...@apache.org.
[FLINK-5369] [build] Rework jsr305 and logging dependencies.

Currently, every project in Flink has a hard (compile scope) dependency on the jsr305, slf4j, and log4j
artifacts. That way they are pulled into every fat jar, including user fat jars as soon as they refer to
a connector or library.

This commit changes the behavior in two ways:

  1. It removes the concrete logger dependencies from the root pom file. Instead, it adds them to the
     'flink-core' project. That way, all modules that refer to 'flink-core' will have those dependencies
     as well, but the projects that have 'flink-core' as provided (connectors, libraries, user programs,
     etc) will have those dependencies transitively as provided as well.

  2. The commit overrides the slf4j and jsr305 dependencies in the parents of 'flink-connectors',
     'flink-libraries', and 'flink-metrics' and sets the to 'provided'. That way all core projects
     pull the logger classes, but all projects that are not part of flink-dist (and rather bundled
     in fat jars) will not bundle these dependencies again.

The flink-dist puts the dependencies into the fat jar (slf4j, jsr305) or the lib folder (log4j).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cefb8db3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cefb8db3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cefb8db3

Branch: refs/heads/master
Commit: cefb8db3525dc25421d41cfcdaaac1bd5540bc68
Parents: 9bc1063
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Dec 19 16:24:29 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:43 2016 +0100

----------------------------------------------------------------------
 flink-connectors/pom.xml                        | 17 +++++
 flink-core/pom.xml                              | 31 ++++++++-
 flink-dist/pom.xml                              | 20 +++++-
 flink-examples/pom.xml                          | 19 ++++++
 flink-java/pom.xml                              |  8 +++
 flink-libraries/flink-cep-scala/pom.xml         | 21 ++++++
 flink-libraries/flink-gelly-examples/pom.xml    | 14 ++++
 flink-libraries/flink-gelly-scala/pom.xml       | 41 ++++++++---
 flink-libraries/flink-gelly/pom.xml             |  6 ++
 flink-libraries/flink-ml/pom.xml                | 26 +++++++
 flink-libraries/pom.xml                         | 18 +++++
 flink-metrics/pom.xml                           | 18 +++++
 .../main/resources/archetype-resources/pom.xml  | 33 +++++++++
 .../main/resources/archetype-resources/pom.xml  | 29 ++++++++
 flink-runtime/pom.xml                           |  5 ++
 flink-scala/pom.xml                             |  6 --
 pom.xml                                         | 71 ++++++++++++++------
 17 files changed, 344 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index ba5ce46..c563d92 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -54,6 +54,23 @@ under the License.
 		<module>flink-connector-filesystem</module>
 	</modules>
 
+	<!-- override these root dependencies as 'provided', so they don't end up
+		in the jars-with-dependencies (uber jars) of connectors and
+		user programs that depend on the connectors -->
+
+	<dependencies>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.google.code.findbugs</groupId>
+			<artifactId>jsr305</artifactId>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
 	<!-- See main pom.xml for explanation of profiles -->
 	<profiles>
 		<!--

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 396ef86..ea261cf 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -47,6 +47,14 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<!-- standard utilities -->
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-lang3</artifactId>
+			<!-- managed version -->
+		</dependency>
+
+		<!-- for the fallback generic serializer -->
 		<dependency>
 			<groupId>com.esotericsoftware.kryo</groupId>
 			<artifactId>kryo</artifactId>
@@ -87,7 +95,28 @@ under the License.
 			<version>${asm.version}</version>
 		</dependency>
 
-		<!-- test dependencies -->
+		<!--
+			Because there are no logger implementation dependency in the root pom, we
+			add them here so that they are available during execution of code (core 
+			and example) in the IDE
+
+			NOTE: Once we are confident that users will use the newer quickstart templates,
+			we can drop these dependencies and only add them to 'flink-dist' and as test
+			dependencies
+		-->
+
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+		</dependency>
+
+		<!-- ================== test dependencies ================== -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils-junit</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 93feec6..25dc708 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -35,7 +35,8 @@ under the License.
 
 	<dependencies>
 
-		<!-- BINARIES -->
+		<!-- Flink project binaries -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
@@ -137,7 +138,22 @@ under the License.
 			<artifactId>flink-yarn_2.10</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-		
+
+		<!-- Concrete logging framework - we only add this here to not tie
+			the projects to one specific framework and make it easier for
+			users to swap logging frameworks -->
+
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<scope>compile</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<scope>compile</scope>
+		</dependency>
 	</dependencies>
 
 	<profiles>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/pom.xml b/flink-examples/pom.xml
index e47ad2c..e285b63 100644
--- a/flink-examples/pom.xml
+++ b/flink-examples/pom.xml
@@ -33,6 +33,9 @@ under the License.
 	<packaging>pom</packaging>
 
 	<dependencies>
+
+		<!-- Flink dependencies -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
@@ -44,6 +47,22 @@ under the License.
 			<artifactId>flink-clients_2.10</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+
+		<!-- Add a logging Framework, to make the examples produce -->
+		<!--             logs when executing in the IDE            -->
+
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<scope>compile</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<scope>compile</scope>
+		</dependency>
+
 	</dependencies>
 
 	<modules>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 728b6ee..846209f 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -55,10 +55,18 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-lang3</artifactId>
+			<!-- managed version -->
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-math3</artifactId>
 			<!-- managed version -->
 		</dependency>
 
+		<!-- test dependencies -->
+
 		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-libraries/flink-cep-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/pom.xml b/flink-libraries/flink-cep-scala/pom.xml
index a613d44..81e5190 100644
--- a/flink-libraries/flink-cep-scala/pom.xml
+++ b/flink-libraries/flink-cep-scala/pom.xml
@@ -52,12 +52,33 @@ under the License.
         </dependency>
 
         <!-- We need to add this explicitly due to shading -->
+
         <dependency>
             <groupId>org.ow2.asm</groupId>
             <artifactId>asm</artifactId>
             <version>${asm.version}</version>
         </dependency>
 
+        <!-- the dependencies below are already provided in Flink -->
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- test dependencies -->
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-libraries/flink-gelly-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml
index 9b90b04..28c7c67 100644
--- a/flink-libraries/flink-gelly-examples/pom.xml
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -61,6 +61,20 @@
 			<version>${project.version}</version>
 		</dependency>
 
+		<!-- to be able to execute the examples properly in common IDEs, we need to
+			restate these dependencies in 'compile' scope -->
+
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.google.code.findbugs</groupId>
+			<artifactId>jsr305</artifactId>
+			<scope>compile</scope>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-libraries/flink-gelly-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/pom.xml b/flink-libraries/flink-gelly-scala/pom.xml
index 08aeace..69a7bfb 100644
--- a/flink-libraries/flink-gelly-scala/pom.xml
+++ b/flink-libraries/flink-gelly-scala/pom.xml
@@ -34,12 +34,28 @@ under the License.
     <packaging>jar</packaging>
 
     <dependencies>
+        
+        <!-- core dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-scala_2.10</artifactId>
             <version>${project.version}</version>
-			<scope>provided</scope>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-gelly_2.10</artifactId>
+            <version>${project.version}</version>
         </dependency>
+
         <!-- We need to add this explicitly because through shading the dependency on asm seems
         to go away. -->
         <dependency>
@@ -47,16 +63,25 @@ under the License.
             <artifactId>asm</artifactId>
             <version>${asm.version}</version>
         </dependency>
+
+        <!-- the dependencies below are already provided in Flink -->
+
         <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients_2.10</artifactId>
-            <version>${project.version}</version>
-			<scope>provided</scope>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <scope>provided</scope>
         </dependency>
+
         <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-gelly_2.10</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <scope>provided</scope>
         </dependency>
         
         <!-- test dependencies -->

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-libraries/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/pom.xml b/flink-libraries/flink-gelly/pom.xml
index ff39298..7feb177 100644
--- a/flink-libraries/flink-gelly/pom.xml
+++ b/flink-libraries/flink-gelly/pom.xml
@@ -52,6 +52,12 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-lang3</artifactId>
+			<scope>provided</scope>
+		</dependency>
+
 		<!-- test dependencies -->
 		
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-libraries/flink-ml/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml
index 83a53b7..c1a9a39 100644
--- a/flink-libraries/flink-ml/pom.xml
+++ b/flink-libraries/flink-ml/pom.xml
@@ -50,6 +50,32 @@
 			<version>0.12</version>
 		</dependency>
 
+		<!-- the dependencies below are already provided in Flink -->
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-reflect</artifactId>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-library</artifactId>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-compiler</artifactId>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-math3</artifactId>
+			<scope>provided</scope>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-libraries/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml
index c0698c8..31f6e03 100644
--- a/flink-libraries/pom.xml
+++ b/flink-libraries/pom.xml
@@ -43,4 +43,22 @@ under the License.
 		<module>flink-cep</module>
 		<module>flink-cep-scala</module>
 	</modules>
+
+	<!-- override these root dependencies as 'provided', so they don't end up
+		in the jars-with-dependencies (uber jars) of connectors and
+		user programs that depend on the connectors -->
+
+	<dependencies>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.google.code.findbugs</groupId>
+			<artifactId>jsr305</artifactId>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
index 629c50d..90785b4 100644
--- a/flink-metrics/pom.xml
+++ b/flink-metrics/pom.xml
@@ -41,4 +41,22 @@ under the License.
 		<module>flink-metrics-jmx</module>
 		<module>flink-metrics-statsd</module>
 	</modules>
+
+	<!-- override these root dependencies as 'provided', so they don't end up
+		in the jars-with-dependencies. They are already contained
+		in the flink-dist build -->
+
+	<dependencies>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.google.code.findbugs</groupId>
+			<artifactId>jsr305</artifactId>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index 57f3e25..a502037 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -31,6 +31,8 @@ under the License.
 	<properties>
 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 		<flink.version>1.2-SNAPSHOT</flink.version>
+		<slf4j.version>1.7.7</slf4j.version>
+		<log4j.version>1.2.17</log4j.version>
 	</properties>
 
 	<repositories>
@@ -72,6 +74,7 @@ under the License.
 	-->
 
 	<dependencies>
+		<!-- Apache Flink dependencies -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java</artifactId>
@@ -87,15 +90,31 @@ under the License.
 			<artifactId>flink-clients_2.10</artifactId>
 			<version>${flink.version}</version>
 		</dependency>
+
+		<!-- explicitly add a standard loggin framework, as Flink does not (in the future) have
+			a hard dependency on one specific framework by default -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>${slf4j.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>${log4j.version}</version>
+		</dependency>
+		
 	</dependencies>
 
 	<profiles>
 		<profile>
 			<!-- Profile for packaging correct JAR files -->
 			<id>build-jar</id>
+
 			<activation>
 				<activeByDefault>false</activeByDefault>
 			</activation>
+
 			<dependencies>
 				<dependency>
 					<groupId>org.apache.flink</groupId>
@@ -115,6 +134,18 @@ under the License.
 					<version>${flink.version}</version>
 					<scope>provided</scope>
 				</dependency>
+				<dependency>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+					<version>${slf4j.version}</version>
+					<scope>provided</scope>
+				</dependency>
+				<dependency>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+					<version>${log4j.version}</version>
+					<scope>provided</scope>
+				</dependency>
 			</dependencies>
 
 			<build>
@@ -191,6 +222,8 @@ under the License.
 									versions of these dependencies.
 
 									-->
+
+									<exclude>log4j:log4j</exclude>
 									<exclude>org.scala-lang:scala-library</exclude>
 									<exclude>org.scala-lang:scala-compiler</exclude>
 									<exclude>org.scala-lang:scala-reflect</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index 24225f6..c7211b8 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -46,6 +46,8 @@ under the License.
 	<properties>
 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 		<flink.version>1.2-SNAPSHOT</flink.version>
+		<slf4j.version>1.7.7</slf4j.version>
+		<log4j.version>1.2.17</log4j.version>
 	</properties>
 
 	<!-- 
@@ -73,6 +75,7 @@ under the License.
 	-->
 
 	<dependencies>
+		<!-- Apache Flink dependencies -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-scala_2.10</artifactId>
@@ -88,6 +91,19 @@ under the License.
 			<artifactId>flink-clients_2.10</artifactId>
 			<version>${flink.version}</version>
 		</dependency>
+		
+		<!-- explicitly add a standard loggin framework, as Flink does not (in the future) have
+			a hard dependency on one specific framework by default -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>${slf4j.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>${log4j.version}</version>
+		</dependency>
 	</dependencies>
 
 	<profiles>
@@ -116,6 +132,18 @@ under the License.
 					<version>${flink.version}</version>
 					<scope>provided</scope>
 				</dependency>
+				<dependency>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+					<version>${slf4j.version}</version>
+					<scope>provided</scope>
+				</dependency>
+				<dependency>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+					<version>${log4j.version}</version>
+					<scope>provided</scope>
+				</dependency>
 			</dependencies>
 
 			<build>
@@ -196,6 +224,7 @@ under the License.
 
 									-->
 
+									<exclude>log4j:log4j</exclude>
 									<exclude>org.scala-lang:scala-library</exclude>
 									<exclude>org.scala-lang:scala-compiler</exclude>
 									<exclude>org.scala-lang:scala-reflect</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index e522d77..4a35304 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -57,6 +57,11 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-lang3</artifactId>
+		</dependency>
+		
+		<dependency>
 			<groupId>commons-cli</groupId>
 			<artifactId>commons-cli</artifactId>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 88f49e5..3777de5 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -44,12 +44,6 @@ under the License.
 			<artifactId>flink-java</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-optimizer_2.10</artifactId>
-			<version>${project.version}</version>
-		</dependency>
 
 		<dependency>
 			<groupId>org.scala-lang</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/cefb8db3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9676976..6f16aa6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,7 +91,6 @@ under the License.
 		<flink.forkCount>1C</flink.forkCount>
 		<flink.reuseForks>true</flink.reuseForks>
 		<log4j.configuration>log4j-test.properties</log4j.configuration>
-		<slf4j.version>1.7.7</slf4j.version>
 		<guava.version>18.0</guava.version>
 		<akka.version>2.3-custom</akka.version>
 		<java.version>1.7</java.version>
@@ -125,35 +124,21 @@ under the License.
 			<version>1.2-SNAPSHOT</version>
 		</dependency>
 
-		<!-- Add the 'javax.annotation' annotations (JSR305), such as '@Nullable' -->
-		<dependency>
-			<groupId>com.google.code.findbugs</groupId>
-			<artifactId>jsr305</artifactId>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.commons</groupId>
-			<artifactId>commons-lang3</artifactId>
-			<version>3.3.2</version>
-		</dependency>
+		<!-- Root dependencies for all projects -->
 
+		<!-- Logging API -->
 		<dependency>
 			<groupId>org.slf4j</groupId>
 			<artifactId>slf4j-api</artifactId>
-			<version>${slf4j.version}</version>
 		</dependency>
 
+		<!-- 'javax.annotation' classes like '@Nullable' -->
 		<dependency>
-			<groupId>org.slf4j</groupId>
-			<artifactId>slf4j-log4j12</artifactId>
-			<version>${slf4j.version}</version>
+			<groupId>com.google.code.findbugs</groupId>
+			<artifactId>jsr305</artifactId>
 		</dependency>
 
-		<dependency>
-			<groupId>log4j</groupId>
-			<artifactId>log4j</artifactId>
-			<version>1.2.17</version>
-		</dependency>
+		<!-- test dependencies -->
 
 		<dependency>
 			<groupId>junit</groupId>
@@ -194,9 +179,27 @@ under the License.
 			<type>jar</type>
 			<scope>test</scope>
 		</dependency>
+
+		<!-- tests will have log4j as the default logging framework available -->
+
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<type>jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<type>jar</type>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<!-- this section defines the module versions that are used if nothing else is specified. -->
+	
 	<dependencyManagement>
 		<!-- WARN: 
 			DO NOT put 	guava, 
@@ -215,7 +218,31 @@ under the License.
 				<artifactId>jsr305</artifactId>
 				<version>1.3.9</version>
 			</dependency>
-			
+
+			<dependency>
+				<groupId>org.slf4j</groupId>
+				<artifactId>slf4j-api</artifactId>
+				<version>1.7.7</version>
+			</dependency>
+
+			<dependency>
+				<groupId>org.slf4j</groupId>
+				<artifactId>slf4j-log4j12</artifactId>
+				<version>1.7.7</version>
+			</dependency>
+
+			<dependency>
+				<groupId>log4j</groupId>
+				<artifactId>log4j</artifactId>
+				<version>1.2.17</version>
+			</dependency>
+
+			<dependency>
+				<groupId>org.apache.commons</groupId>
+				<artifactId>commons-lang3</artifactId>
+				<version>3.3.2</version>
+			</dependency>
+
 			<!-- Make sure we use a consistent avro version throughout the project -->
 			<dependency>
 				<groupId>org.apache.avro</groupId>