You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/06/22 09:21:54 UTC

[1/6] flink git commit: [FLINK-8795] Fixed local scala shell for Flip6

Repository: flink
Updated Branches:
  refs/heads/release-1.5 e0265062e -> b725982e5


[FLINK-8795] Fixed local scala shell for Flip6

This closes #6182.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab0f587a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab0f587a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab0f587a

Branch: refs/heads/release-1.5
Commit: ab0f587a1e007301c749ccfc6052d3e401d275e8
Parents: e026506
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Tue Jun 19 09:49:31 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 11:20:22 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/scala/FlinkShell.scala | 36 +++++++---
 .../flink/api/scala/ScalaShellITCase.scala      | 72 +++++++++++++-------
 .../scala/ScalaShellLocalStartupITCase.scala    | 14 +++-
 3 files changed, 87 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ab0f587a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index dbdc052..b74a8a0 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -23,9 +23,9 @@ import java.io._
 import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser}
 import org.apache.flink.client.deployment.ClusterDescriptor
 import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.configuration.{Configuration, GlobalConfiguration, JobManagerOptions}
+import org.apache.flink.configuration.{Configuration, CoreOptions, GlobalConfiguration, JobManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
+import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.tools.nsc.Settings
@@ -137,20 +137,36 @@ object FlinkShell {
     }
   }
 
+  private type LocalCluster = Either[StandaloneMiniCluster, MiniCluster]
+
   def fetchConnectionInfo(
     configuration: Configuration,
     config: Config
-  ): (String, Int, Option[Either[StandaloneMiniCluster, ClusterClient[_]]]) = {
+  ): (String, Int, Option[Either[LocalCluster , ClusterClient[_]]]) = {
     config.executionMode match {
       case ExecutionMode.LOCAL => // Local mode
         val config = configuration
         config.setInteger(JobManagerOptions.PORT, 0)
 
-        val miniCluster = new StandaloneMiniCluster(config)
+        val (miniCluster, port) = config.getString(CoreOptions.MODE) match {
+          case CoreOptions.LEGACY_MODE => {
+            val cluster = new StandaloneMiniCluster(config)
+
+            (Left(cluster), cluster.getPort)
+          }
+          case CoreOptions.NEW_MODE => {
+            val miniClusterConfig = new MiniClusterConfiguration.Builder()
+              .setConfiguration(config)
+              .build()
+            val cluster = new MiniCluster(miniClusterConfig)
+            cluster.start()
+
+            (Right(cluster), cluster.getRestAddress.getPort)
+          }
+        }
 
-        println("\nStarting local Flink cluster (host: localhost, " +
-          s"port: ${miniCluster.getPort}).\n")
-        ("localhost", miniCluster.getPort, Some(Left(miniCluster)))
+        println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
+        ("localhost", port, Some(Left(miniCluster)))
 
       case ExecutionMode.REMOTE => // Remote mode
         if (config.host.isEmpty || config.port.isEmpty) {
@@ -193,7 +209,8 @@ object FlinkShell {
     val (repl, cluster) = try {
       val (host, port, cluster) = fetchConnectionInfo(configuration, config)
       val conf = cluster match {
-        case Some(Left(miniCluster)) => miniCluster.getConfiguration
+        case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
+        case Some(Left(Right(_))) => configuration
         case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
         case None => configuration
       }
@@ -223,7 +240,8 @@ object FlinkShell {
     } finally {
       repl.closeInterpreter()
       cluster match {
-        case Some(Left(miniCluster)) => miniCluster.close()
+        case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
+        case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
         case Some(Right(yarnCluster)) => yarnCluster.shutdown()
         case _ =>
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab0f587a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 2e07fb9..12522a8 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -19,19 +19,16 @@
 package org.apache.flink.api.scala
 
 import java.io._
+import java.util.Objects
 
-import akka.actor.ActorRef
-import akka.pattern.Patterns
-import org.apache.flink.configuration.{Configuration, CoreOptions, TaskManagerOptions}
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
+import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
+import org.apache.flink.test.util.MiniClusterResource
 import org.apache.flink.util.TestLogger
-import org.junit.rules.TemporaryFolder
 import org.junit._
+import org.junit.rules.TemporaryFolder
 
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration.FiniteDuration
 import scala.tools.nsc.Settings
 
 class ScalaShellITCase extends TestLogger {
@@ -280,11 +277,11 @@ class ScalaShellITCase extends TestLogger {
     BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml"))
 
     val args = cluster match {
-      case Some(cl) =>
+      case Some(_) =>
         Array(
           "remote",
-          cl.getHostname,
-          Integer.toString(cl.getPort),
+          hostname,
+          Integer.toString(port),
           "--configDir",
           dir.getAbsolutePath)
       case None => throw new IllegalStateException("Cluster has not been started.")
@@ -314,17 +311,43 @@ class ScalaShellITCase extends TestLogger {
 }
 
 object ScalaShellITCase {
-  var cluster: Option[StandaloneMiniCluster] = None
 
-  val parallelism = 4
   val configuration = new Configuration()
+  var cluster: Option[Either[MiniCluster, StandaloneMiniCluster]] = None
+
+  var port: Int = _
+  var hostname : String = _
+  val parallelism: Int = 4
 
   @BeforeClass
   def beforeAll(): Unit = {
-    configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
-    configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
-
-    cluster = Option(new StandaloneMiniCluster(configuration))
+    val isNew = Objects.equals(MiniClusterResource.NEW_CODEBASE,
+      System.getProperty(MiniClusterResource.CODEBASE_KEY))
+    if (isNew) {
+      configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+      // set to different than default so not to interfere with ScalaShellLocalStartupITCase
+      configuration.setInteger(RestOptions.PORT, 8082)
+      val miniConfig = new MiniClusterConfiguration.Builder()
+        .setConfiguration(configuration)
+        .setNumSlotsPerTaskManager(parallelism)
+        .build()
+
+      val miniCluster = new MiniCluster(miniConfig)
+      miniCluster.start()
+      port = miniCluster.getRestAddress.getPort
+      hostname = miniCluster.getRestAddress.getHost
+
+      cluster = Some(Left(miniCluster))
+    } else {
+      configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
+      configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
+      val standaloneCluster = new StandaloneMiniCluster(configuration)
+
+      hostname = standaloneCluster.getHostname
+      port = standaloneCluster.getPort
+
+      cluster = Some(Right(standaloneCluster))
+    }
   }
 
   @AfterClass
@@ -332,7 +355,10 @@ object ScalaShellITCase {
     // The Scala interpreter somehow changes the class loader. Therefore, we have to reset it
     Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
 
-    cluster.foreach(_.close)
+    cluster.foreach {
+      case Left(miniCluster) => miniCluster.close()
+      case Right(miniCluster) => miniCluster.close()
+    }
   }
 
   /**
@@ -350,18 +376,18 @@ object ScalaShellITCase {
     System.setOut(new PrintStream(baos))
 
     cluster match {
-      case Some(cl) =>
+      case Some(_) =>
         val repl = externalJars match {
           case Some(ej) => new FlinkILoop(
-            cl.getHostname,
-            cl.getPort,
+            hostname,
+            port,
             configuration,
             Option(Array(ej)),
             in, new PrintWriter(out))
 
           case None => new FlinkILoop(
-            cl.getHostname,
-            cl.getPort,
+            hostname,
+            port,
             configuration,
             in, new PrintWriter(out))
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab0f587a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index 9365948..a971db8 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -19,12 +19,14 @@
 package org.apache.flink.api.scala
 
 import java.io._
+import java.util.Objects
 
 import org.apache.flink.configuration.{Configuration, CoreOptions}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
+import org.apache.flink.test.util.MiniClusterResource
 import org.apache.flink.util.TestLogger
-import org.junit.{Assert, Rule, Test}
 import org.junit.rules.TemporaryFolder
+import org.junit.{Assert, Rule, Test}
 
 class ScalaShellLocalStartupITCase extends TestLogger {
 
@@ -85,7 +87,13 @@ class ScalaShellLocalStartupITCase extends TestLogger {
     System.setOut(new PrintStream(baos))
 
     val configuration = new Configuration()
-    configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
+    val mode = if (Objects.equals(MiniClusterResource.NEW_CODEBASE,
+      System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+      CoreOptions.NEW_MODE
+    } else {
+      CoreOptions.LEGACY_MODE
+    }
+    configuration.setString(CoreOptions.MODE, mode)
 
     val dir = temporaryFolder.newFolder()
     BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml"))
@@ -93,7 +101,7 @@ class ScalaShellLocalStartupITCase extends TestLogger {
     val args: Array[String] = Array("local", "--configDir", dir.getAbsolutePath)
 
     //start flink scala shell
-    FlinkShell.bufferedReader = Some(in);
+    FlinkShell.bufferedReader = Some(in)
     FlinkShell.main(args)
 
     baos.flush()


[4/6] flink git commit: [FLINK-9638][E2E Tests] Add helper script to run single test

Posted by tz...@apache.org.
[FLINK-9638][E2E Tests] Add helper script to run single test

This commit adds a helper script `run-single-test.sh` that allows you to run
a single test in the context of the text runner.

This provides you with
* Setup of ENV variables
* cleanup after test
* Nicer output

Usage: ./run-single-test.sh <path-to-test-script> [<arg1> <arg2> ...]

This closes #6197.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec8a743d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec8a743d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec8a743d

Branch: refs/heads/release-1.5
Commit: ec8a743da43d7bc7b7101d05d44b8809622ef2ec
Parents: d7d6825
Author: Florian Schmidt <fl...@icloud.com>
Authored: Thu Jun 21 15:42:08 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 11:20:51 2018 +0200

----------------------------------------------------------------------
 flink-end-to-end-tests/run-single-test.sh | 52 ++++++++++++++++++++++++++
 1 file changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ec8a743d/flink-end-to-end-tests/run-single-test.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-single-test.sh b/flink-end-to-end-tests/run-single-test.sh
new file mode 100755
index 0000000..86b313d
--- /dev/null
+++ b/flink-end-to-end-tests/run-single-test.sh
@@ -0,0 +1,52 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This can be used to run a single test in the context of a test runnner
+# Usage: ./run-single-test.sh test-scripts/my-test-case.sh
+
+if [ $# -eq 0 ]; then
+    echo "Usage: ./run-single-test.sh <path-to-test-script> [<arg1> <arg2> ...]"
+    exit 1
+fi
+
+END_TO_END_DIR="`dirname \"$0\"`" # relative
+END_TO_END_DIR="`( cd \"$END_TO_END_DIR\" && pwd )`" # absolutized and normalized
+if [ -z "$END_TO_END_DIR" ] ; then
+    # error; for some reason, the path is not accessible
+    # to the script (e.g. permissions re-evaled after suid)
+    exit 1  # fail
+fi
+
+export END_TO_END_DIR
+
+if [ -z "$FLINK_DIR" ] ; then
+    echo "You have to export the Flink distribution directory as FLINK_DIR"
+    exit 1
+fi
+
+source "${END_TO_END_DIR}/test-scripts/test-runner-common.sh"
+
+FLINK_DIR="`( cd \"$FLINK_DIR\" && pwd )`" # absolutized and normalized
+
+echo "flink-end-to-end-test directory: $END_TO_END_DIR"
+echo "Flink distribution directory: $FLINK_DIR"
+
+run_test "$*" "$*"
+
+exit 0


[5/6] flink git commit: [FLINK-9594][E2E tests] Improve docs for new test runner

Posted by tz...@apache.org.
[FLINK-9594][E2E tests] Improve docs for new test runner

This closes #6172.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c1e3f02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c1e3f02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c1e3f02

Branch: refs/heads/release-1.5
Commit: 2c1e3f02fd7b83ee21c5ac851309b2fb5fbff4f0
Parents: ec8a743
Author: Florian Schmidt <fl...@icloud.com>
Authored: Fri Jun 15 11:13:39 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 11:20:58 2018 +0200

----------------------------------------------------------------------
 flink-end-to-end-tests/README.md | 42 +++++++++++++++++++++++++++++++----
 1 file changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c1e3f02/flink-end-to-end-tests/README.md
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/README.md b/flink-end-to-end-tests/README.md
index 82d7e41..26dba3d 100644
--- a/flink-end-to-end-tests/README.md
+++ b/flink-end-to-end-tests/README.md
@@ -28,11 +28,45 @@ where <flink dir> is a Flink distribution directory.
 You can also run tests individually via
 
 ```
-$ FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh
+$ FLINK_DIR=<flink dir> flink-end-to-end-tests/run-single-test.sh your_test.sh arg1 arg2
 ```
 
 ## Writing Tests
 
-Have a look at test_batch_wordcount.sh for a very basic test and
-test_streaming_kafka010.sh for a more involved example. Whenever possible, try
-to put new functionality in common.sh so that it can be reused by other tests.
+### Examples
+Have a look at `test_batch_wordcount.sh` for a very basic test and
+`test_streaming_kafka010.sh` for a more involved example. Whenever possible, try
+to put new functionality in `common.sh` so that it can be reused by other tests.
+
+### Adding a test case
+In order to add a new test case you need add it to either `test-scripts/run-nightly-tests.sh` and / or `test-scripts/run-pre-commit-tests.sh`. Templates on how to add tests can be found in those respective files.
+
+_Note: If you want to parameterize your tests please do so by adding multiple test cases with parameters as arguments to the nightly / pre-commit test suites. This allows the test runner to do a cleanup in between each individual test and also to fail those tests individually._
+
+### Passing your test
+A test is considered to have passed if it:
+- has exit code 0
+- there are no non-empty .out files (nothing was written to stdout / stderr by your Flink program)
+- there are no exceptions in the log files
+- there are no errors in the log files
+
+_Note: There is a whitelist for exceptions and errors that do not lead to failure, which can be found in the `check_logs_for_errors` and `check_logs_for_exceptions` in `test-scripts/common.sh`._
+
+Please note that a previously supported pattern where you could assign a value the global variable `PASS` to have your tests fail **is not supported anymore**.
+
+### Cleanup
+The test runner performs a cleanup after each test case, which includes:
+- Stopping the cluster
+- Killing all task and job managers
+- Reverting config to default (if changed before)
+- Cleaning up log and temp directories
+
+In some cases your test is required to do to some *additional* cleanup, for example shutting down external systems like Kafka or Elasticsearch. In this case it is a common pattern to trap a `test_cleanup` function to `EXIT` like this:
+
+```sh
+function test_cleanup {
+    # do your custom cleanup here
+}
+
+trap test_cleanup EXIT
+```


[2/6] flink git commit: [hotfix] [docs] Fix a typo in index.md

Posted by tz...@apache.org.
[hotfix] [docs] Fix a typo in index.md

This closes #6193.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0448d518
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0448d518
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0448d518

Branch: refs/heads/release-1.5
Commit: 0448d5182c0261137967fe398d0cddc53958f54e
Parents: ab0f587
Author: MichealShin <lu...@126.com>
Authored: Thu Jun 21 16:50:34 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 11:20:30 2018 +0200

----------------------------------------------------------------------
 docs/index.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0448d518/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index ea3efe1..9217a24 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -31,7 +31,7 @@ Apache Flink is an open source platform for distributed stream and batch data pr
 
 ## First Steps
 
-- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommended you read these sections first.
+- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommend you read these sections first.
 
 - **Quickstarts**: [Run an example program](quickstart/setup_quickstart.html) on your local machine or [study some examples](examples/index.html).
 


[3/6] flink git commit: [FLINK-9394] [e2e] Test rescaling when resuming from externalized checkpoints

Posted by tz...@apache.org.
[FLINK-9394] [e2e] Test rescaling when resuming from externalized checkpoints

This closes #6038.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7d68253
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7d68253
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7d68253

Branch: refs/heads/release-1.5
Commit: d7d68253f59ab906519ef92e1d463987969e6aa0
Parents: 0448d51
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 18 00:33:12 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 11:20:41 2018 +0200

----------------------------------------------------------------------
 flink-end-to-end-tests/run-nightly-tests.sh     | 24 ++++++---
 .../test_resume_externalized_checkpoints.sh     | 56 +++++++++++++++-----
 2 files changed, 60 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d7d68253/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index da029f8..5399893 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -58,13 +58,23 @@ run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end test" "$E
 run_test "Resuming Savepoint (rocks, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks"
 run_test "Resuming Savepoint (rocks, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks"
 
-run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true"
-run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false"
-run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks"
-
-run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true true"
-run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false true"
-run_test "Resuming Externalized Checkpoint after terminal failure (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks true"
+run_test "Resuming Externalized Checkpoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file false true"
+run_test "Resuming Externalized Checkpoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file false true"
+run_test "Resuming Externalized Checkpoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file false true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true true"
+
+run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true true"
 
 run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
 run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh"

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d68253/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index 6472b23..29c3786 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -17,13 +17,28 @@
 # limitations under the License.
 ################################################################################
 
+if [ -z $1 ] || [ -z $2 ]; then
+ echo "Usage: ./test_resume_externalized_checkpoints.sh <original_dop> <new_dop> <state_backend_setting> <state_backend_file_async_setting> <state_backend_rocks_incremental_setting>"
+ exit 1
+fi
+
 source "$(dirname "$0")"/common.sh
 
-STATE_BACKEND_TYPE=${1:-file}
-STATE_BACKEND_FILE_ASYNC=${2:-true}
-SIMULATE_FAILURE=${3:-false}
+ORIGINAL_DOP=$1
+NEW_DOP=$2
+STATE_BACKEND_TYPE=${3:-file}
+STATE_BACKEND_FILE_ASYNC=${4:-true}
+STATE_BACKEND_ROCKS_INCREMENTAL=${5:-false}
+SIMULATE_FAILURE=${6:-false}
+
+if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+ NUM_SLOTS=$ORIGINAL_DOP
+else
+ NUM_SLOTS=$NEW_DOP
+fi
 
 backup_config
+change_conf "taskmanager.numberOfTaskSlots" "1" "${NUM_SLOTS}"
 setup_flink_slf4j_metric_reporter
 start_cluster
 
@@ -43,18 +58,30 @@ CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"
 
 # run the DataStream allroundjob
 
-echo "Running externalized checkpoints test, with STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC SIMULATE_FAILURE=$SIMULATE_FAILURE ..."
+echo "Running externalized checkpoints test, \
+with ORIGINAL_DOP=$ORIGINAL_DOP NEW_DOP=$NEW_DOP \
+and STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC \
+STATE_BACKEND_ROCKSDB_INCREMENTAL=$STATE_BACKEND_ROCKS_INCREMENTAL SIMULATE_FAILURE=$SIMULATE_FAILURE ..."
 
 TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-BASE_JOB_CMD="$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.externalize_checkpoint true \
-  --environment.externalize_checkpoint.cleanup retain \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1"
+
+function buildBaseJobCmd {
+  local dop=$1
+
+  echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $dop \
+    --environment.externalize_checkpoint true \
+    --environment.externalize_checkpoint.cleanup retain \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --state_backend.rocks.incremental $STATE_BACKEND_ROCKS_INCREMENTAL \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1"
+}
+
+BASE_JOB_CMD=`buildBaseJobCmd $ORIGINAL_DOP`
 
 JOB_CMD=""
 if [[ $SIMULATE_FAILURE == "true" ]]; then
@@ -98,6 +125,9 @@ if (( $NUM_CHECKPOINTS > 1 )); then
 fi
 
 echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
+
+BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP`
+
 DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
 wait_job_running $DATASTREAM_JOB


[6/6] flink git commit: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer backpressuring

Posted by tz...@apache.org.
[FLINK-9374] [kinesis] Enable FlinkKinesisProducer backpressuring

This closes #9374.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b725982e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b725982e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b725982e

Branch: refs/heads/release-1.5
Commit: b725982e5758043ba3aa53bde1615569336e451e
Parents: 2c1e3f0
Author: Franz Thoma <fr...@tngtech.com>
Authored: Wed May 9 08:27:47 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 11:21:19 2018 +0200

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  | 30 +++++++
 .../kinesis/FlinkKinesisProducer.java           | 70 ++++++++++++++-
 .../connectors/kinesis/util/TimeoutLatch.java   | 44 +++++++++
 .../kinesis/FlinkKinesisProducerTest.java       | 95 +++++++++++++++-----
 .../flink/core/testutils/CheckedThread.java     | 13 ++-
 .../flink/core/testutils/MultiShotLatch.java    |  9 ++
 6 files changed, 239 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b725982e/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 7551142..03224a1 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -344,6 +344,36 @@ Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL fr
 
 Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example.
 
+### Backpressure
+
+By default, `FlinkKinesisProducer` does not backpressure. Instead, records that
+cannot be sent because of the rate restriction of 1 MB per second per shard are
+buffered in an unbounded queue and dropped when their `RecordTtl` expires.
+
+To avoid data loss, you can enable backpressuring by restricting the size of the
+internal queue:
+
+```
+// 200 Bytes per record, 1 shard
+kinesis.setQueueLimit(500);
+```
+
+The value for `queueLimit` depends on the expected record size. To choose a good
+value, consider that Kinesis is rate-limited to 1MB per second per shard. If
+less than one second's worth of records is buffered, then the queue may not be
+able to operate at full capacity. With the default `RecordMaxBufferedTime` of
+100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit`
+can then be computed via
+
+```
+queue limit = (number of shards * queue size per shard) / record size
+```
+
+E.g. for 200Bytes per record and 8 shards, a queue limit of 4000 is a good
+starting point. If the queue size limits throughput (below 1MB per second per
+shard), try increasing the queue limit slightly.
+
+
 ## Using Non-AWS Kinesis Endpoints for Testing
 
 It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as

http://git-wip-us.apache.org/repos/asf/flink/blob/b725982e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index a9b48ae..b086ac1 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -21,12 +21,15 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.TimeoutLatch;
 import org.apache.flink.util.InstantiationUtil;
 
 import com.amazonaws.services.kinesis.producer.Attempt;
@@ -55,6 +58,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @PublicEvolving
 public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
 
+	public static final String KINESIS_PRODUCER_METRIC_GROUP = "kinesisProducer";
+
+	public static final String METRIC_BACKPRESSURE_CYCLES = "backpressureCycles";
+
+	public static final String METRIC_OUTSTANDING_RECORDS_COUNT = "outstandingRecordsCount";
+
 	private static final long serialVersionUID = 6447077318449477846L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
@@ -65,6 +74,9 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 	/* Flag controlling the error behavior of the producer */
 	private boolean failOnError = false;
 
+	/* Maximum length of the internal record queue before backpressuring */
+	private int queueLimit = Integer.MAX_VALUE;
+
 	/* Name of the default stream to produce to. Can be overwritten by the serialization schema */
 	private String defaultStream;
 
@@ -82,9 +94,15 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 	/* Our Kinesis instance for each parallel Flink sink */
 	private transient KinesisProducer producer;
 
+	/* Backpressuring waits for this latch, triggered by record callback */
+	private transient TimeoutLatch backpressureLatch;
+
 	/* Callback handling failures */
 	private transient FutureCallback<UserRecordResult> callback;
 
+	/* Counts how often we have to wait for KPL because we are above the queue limit */
+	private transient Counter backpressureCycles;
+
 	/* Field for async exception */
 	private transient volatile Throwable thrownException;
 
@@ -145,6 +163,18 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 	}
 
 	/**
+	 * The {@link KinesisProducer} holds an unbounded queue internally. To avoid memory
+	 * problems under high loads, a limit can be employed above which the internal queue
+	 * will be flushed, thereby applying backpressure.
+	 *
+	 * @param queueLimit The maximum length of the internal queue before backpressuring
+	 */
+	public void setQueueLimit(int queueLimit) {
+		checkArgument(queueLimit > 0, "queueLimit must be a positive number");
+		this.queueLimit = queueLimit;
+	}
+
+	/**
 	 * Set a default stream name.
 	 * @param defaultStream Name of the default Kinesis stream
 	 */
@@ -180,9 +210,16 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
 
 		producer = getKinesisProducer(producerConfig);
+
+		final MetricGroup kinesisMectricGroup = getRuntimeContext().getMetricGroup().addGroup(KINESIS_PRODUCER_METRIC_GROUP);
+		this.backpressureCycles = kinesisMectricGroup.counter(METRIC_BACKPRESSURE_CYCLES);
+		kinesisMectricGroup.gauge(METRIC_OUTSTANDING_RECORDS_COUNT, producer::getOutstandingRecordsCount);
+
+		backpressureLatch = new TimeoutLatch();
 		callback = new FutureCallback<UserRecordResult>() {
 			@Override
 			public void onSuccess(UserRecordResult result) {
+				backpressureLatch.trigger();
 				if (!result.isSuccessful()) {
 					if (failOnError) {
 						// only remember the first thrown exception
@@ -197,6 +234,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 
 			@Override
 			public void onFailure(Throwable t) {
+				backpressureLatch.trigger();
 				if (failOnError) {
 					thrownException = t;
 				} else {
@@ -219,6 +257,11 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 		}
 
 		checkAndPropagateAsyncError();
+		boolean didWaitForFlush = enforceQueueLimit();
+
+		if (didWaitForFlush) {
+			checkAndPropagateAsyncError();
+		}
 
 		String stream = defaultStream;
 		String partition = defaultPartition;
@@ -327,6 +370,32 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 	}
 
 	/**
+	 * If the internal queue of the {@link KinesisProducer} gets too long,
+	 * flush some of the records until we are below the limit again.
+	 * We don't want to flush _all_ records at this point since that would
+	 * break record aggregation.
+	 *
+	 * @return boolean whether flushing occurred or not
+	 */
+	private boolean enforceQueueLimit() {
+		int attempt = 0;
+		while (producer.getOutstandingRecordsCount() >= queueLimit) {
+			backpressureCycles.inc();
+			if (attempt >= 10) {
+				LOG.warn("Waiting for the queue length to drop below the limit takes unusually long, still not done after {} attempts.", attempt);
+			}
+			attempt++;
+			try {
+				backpressureLatch.await(100);
+			} catch (InterruptedException e) {
+				LOG.warn("Flushing was interrupted.");
+				break;
+			}
+		}
+		return attempt > 0;
+	}
+
+	/**
 	 * A reimplementation of {@link KinesisProducer#flushSync()}.
 	 * This implementation releases the block on flushing if an interruption occurred.
 	 */
@@ -337,7 +406,6 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 				Thread.sleep(500);
 			} catch (InterruptedException e) {
 				LOG.warn("Flushing was interrupted.");
-
 				break;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b725982e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
new file mode 100644
index 0000000..4dcab33
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.Internal;
+
+@Internal
+public class TimeoutLatch {
+
+	private final Object lock = new Object();
+	private volatile boolean waiting;
+
+	public void await(long timeout) throws InterruptedException {
+		synchronized (lock) {
+			waiting = true;
+			lock.wait(timeout);
+		}
+	}
+
+	public void trigger() {
+		if (waiting) {
+			synchronized (lock) {
+				waiting = false;
+				lock.notifyAll();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b725982e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 86cefff..6355cf5 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -46,6 +46,7 @@ import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -267,6 +268,79 @@ public class FlinkKinesisProducerTest {
 		testHarness.close();
 	}
 
+	/**
+	 * Test ensuring that the producer blocks if the queue limit is exceeded,
+	 * until the queue length drops below the limit;
+	 * we set a timeout because the test will not finish if the logic is broken.
+	 */
+	@Test(timeout = 10000)
+	public void testBackpressure() throws Throwable {
+		final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+		producer.setQueueLimit(1);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+				new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+		testHarness.open();
+
+		UserRecordResult result = mock(UserRecordResult.class);
+		when(result.isSuccessful()).thenReturn(true);
+
+		CheckedThread msg1 = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				testHarness.processElement(new StreamRecord<>("msg-1"));
+			}
+		};
+		msg1.start();
+		msg1.trySync(100);
+		assertFalse("Flush triggered before reaching queue limit", msg1.isAlive());
+
+		// consume msg-1 so that queue is empty again
+		producer.getPendingRecordFutures().get(0).set(result);
+
+		CheckedThread msg2 = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				testHarness.processElement(new StreamRecord<>("msg-2"));
+			}
+		};
+		msg2.start();
+		msg2.trySync(100);
+		assertFalse("Flush triggered before reaching queue limit", msg2.isAlive());
+
+		CheckedThread moreElementsThread = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				// this should block until msg-2 is consumed
+				testHarness.processElement(new StreamRecord<>("msg-3"));
+				// this should block until msg-3 is consumed
+				testHarness.processElement(new StreamRecord<>("msg-4"));
+			}
+		};
+		moreElementsThread.start();
+
+		moreElementsThread.trySync(100);
+		assertTrue("Producer should still block, but doesn't", moreElementsThread.isAlive());
+
+		// consume msg-2 from the queue, leaving msg-3 in the queue and msg-4 blocked
+		producer.getPendingRecordFutures().get(1).set(result);
+
+		moreElementsThread.trySync(100);
+		assertTrue("Producer should still block, but doesn't", moreElementsThread.isAlive());
+
+		// consume msg-3, blocked msg-4 can be inserted into the queue and block is released
+		producer.getPendingRecordFutures().get(2).set(result);
+
+		moreElementsThread.trySync(100);
+
+		assertFalse("Prodcuer still blocks although the queue is flushed", moreElementsThread.isAlive());
+
+		producer.getPendingRecordFutures().get(3).set(result);
+
+		testHarness.close();
+	}
+
 	// ----------------------------------------------------------------------
 	// Utility test classes
 	// ----------------------------------------------------------------------
@@ -346,7 +420,6 @@ public class FlinkKinesisProducerTest {
 		private List<SettableFuture<UserRecordResult>> pendingRecordFutures = new LinkedList<>();
 
 		private transient MultiShotLatch flushLatch;
-		private boolean isFlushed;
 
 		DummyFlinkKinesisProducer(SerializationSchema<T> schema) {
 			super(schema, TestUtils.getStandardProperties());
@@ -378,13 +451,6 @@ public class FlinkKinesisProducerTest {
 				@Override
 				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
 					flushLatch.trigger();
-
-					while (!isAllRecordFuturesCompleted()) {
-						Thread.sleep(50);
-					}
-
-					isFlushed = true;
-
 					return null;
 				}
 			}).when(mockProducer).flush();
@@ -399,12 +465,11 @@ public class FlinkKinesisProducerTest {
 
 		@Override
 		public void snapshotState(FunctionSnapshotContext context) throws Exception {
-			isFlushed = false;
 
 			super.snapshotState(context);
 
 			// if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
-			if (!isFlushed) {
+			if (mockProducer.getOutstandingRecordsCount() > 0) {
 				throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
 			}
 		}
@@ -417,16 +482,6 @@ public class FlinkKinesisProducerTest {
 			flushLatch.await();
 		}
 
-		private boolean isAllRecordFuturesCompleted() {
-			for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
-				if (!future.isDone()) {
-					return false;
-				}
-			}
-
-			return true;
-		}
-
 		private int getNumPendingRecordFutures() {
 			int numPending = 0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b725982e/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
index f2647cc..7aa177a 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -97,8 +97,19 @@ public abstract class CheckedThread extends Thread {
 	 * exceptions thrown from the {@link #go()} method.
 	 */
 	public void sync(long timeout) throws Exception {
-		join(timeout);
+		trySync(timeout);
 		checkFinished();
+	}
+
+	/**
+	 * Waits with timeout until the thread is completed and checks whether any error
+	 * occurred during the execution.
+	 *
+	 * <p>This method blocks like {@link #join()}, but performs an additional check for
+	 * exceptions thrown from the {@link #go()} method.
+	 */
+	public void trySync(long timeout) throws Exception {
+		join(timeout);
 		checkError();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b725982e/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
index 69f73eb..861ea31 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
@@ -52,4 +52,13 @@ public final class MultiShotLatch {
 			triggered = false;
 		}
 	}
+
+	/**
+	 * Checks if the latch was triggered.
+	 *
+	 * @return True, if the latch was triggered, false if not.
+	 */
+	public boolean isTriggered() {
+		return triggered;
+	}
 }