You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/06/22 07:31:33 UTC

[1/7] flink git commit: [FLINK-9594][E2E tests] Improve docs for new test runner

Repository: flink
Updated Branches:
  refs/heads/master ae8cef3de -> 7d034d4ef


[FLINK-9594][E2E tests] Improve docs for new test runner

This closes #6172.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1f79ee8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1f79ee8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1f79ee8

Branch: refs/heads/master
Commit: f1f79ee85f3488a682dfb5bedb5a779365759bed
Parents: d80f263
Author: Florian Schmidt <fl...@icloud.com>
Authored: Fri Jun 15 11:13:39 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:57 2018 +0200

----------------------------------------------------------------------
 flink-end-to-end-tests/README.md | 42 +++++++++++++++++++++++++++++++----
 1 file changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1f79ee8/flink-end-to-end-tests/README.md
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/README.md b/flink-end-to-end-tests/README.md
index 82d7e41..26dba3d 100644
--- a/flink-end-to-end-tests/README.md
+++ b/flink-end-to-end-tests/README.md
@@ -28,11 +28,45 @@ where <flink dir> is a Flink distribution directory.
 You can also run tests individually via
 
 ```
-$ FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh
+$ FLINK_DIR=<flink dir> flink-end-to-end-tests/run-single-test.sh your_test.sh arg1 arg2
 ```
 
 ## Writing Tests
 
-Have a look at test_batch_wordcount.sh for a very basic test and
-test_streaming_kafka010.sh for a more involved example. Whenever possible, try
-to put new functionality in common.sh so that it can be reused by other tests.
+### Examples
+Have a look at `test_batch_wordcount.sh` for a very basic test and
+`test_streaming_kafka010.sh` for a more involved example. Whenever possible, try
+to put new functionality in `common.sh` so that it can be reused by other tests.
+
+### Adding a test case
+In order to add a new test case you need add it to either `test-scripts/run-nightly-tests.sh` and / or `test-scripts/run-pre-commit-tests.sh`. Templates on how to add tests can be found in those respective files.
+
+_Note: If you want to parameterize your tests please do so by adding multiple test cases with parameters as arguments to the nightly / pre-commit test suites. This allows the test runner to do a cleanup in between each individual test and also to fail those tests individually._
+
+### Passing your test
+A test is considered to have passed if it:
+- has exit code 0
+- there are no non-empty .out files (nothing was written to stdout / stderr by your Flink program)
+- there are no exceptions in the log files
+- there are no errors in the log files
+
+_Note: There is a whitelist for exceptions and errors that do not lead to failure, which can be found in the `check_logs_for_errors` and `check_logs_for_exceptions` in `test-scripts/common.sh`._
+
+Please note that a previously supported pattern where you could assign a value the global variable `PASS` to have your tests fail **is not supported anymore**.
+
+### Cleanup
+The test runner performs a cleanup after each test case, which includes:
+- Stopping the cluster
+- Killing all task and job managers
+- Reverting config to default (if changed before)
+- Cleaning up log and temp directories
+
+In some cases your test is required to do to some *additional* cleanup, for example shutting down external systems like Kafka or Elasticsearch. In this case it is a common pattern to trap a `test_cleanup` function to `EXIT` like this:
+
+```sh
+function test_cleanup {
+    # do your custom cleanup here
+}
+
+trap test_cleanup EXIT
+```


[2/7] flink git commit: [FLINK-9638][E2E Tests] Add helper script to run single test

Posted by tz...@apache.org.
[FLINK-9638][E2E Tests] Add helper script to run single test

This commit adds a helper script `run-single-test.sh` that allows you to run
a single test in the context of the text runner.

This provides you with
* Setup of ENV variables
* cleanup after test
* Nicer output

Usage: ./run-single-test.sh <path-to-test-script> [<arg1> <arg2> ...]

This closes #6197.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d80f263f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d80f263f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d80f263f

Branch: refs/heads/master
Commit: d80f263f2e36acccf4dcae06f0cfe1717d09f58d
Parents: cc9ccbb
Author: Florian Schmidt <fl...@icloud.com>
Authored: Thu Jun 21 15:42:08 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:57 2018 +0200

----------------------------------------------------------------------
 flink-end-to-end-tests/run-single-test.sh | 52 ++++++++++++++++++++++++++
 1 file changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d80f263f/flink-end-to-end-tests/run-single-test.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-single-test.sh b/flink-end-to-end-tests/run-single-test.sh
new file mode 100755
index 0000000..86b313d
--- /dev/null
+++ b/flink-end-to-end-tests/run-single-test.sh
@@ -0,0 +1,52 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This can be used to run a single test in the context of a test runnner
+# Usage: ./run-single-test.sh test-scripts/my-test-case.sh
+
+if [ $# -eq 0 ]; then
+    echo "Usage: ./run-single-test.sh <path-to-test-script> [<arg1> <arg2> ...]"
+    exit 1
+fi
+
+END_TO_END_DIR="`dirname \"$0\"`" # relative
+END_TO_END_DIR="`( cd \"$END_TO_END_DIR\" && pwd )`" # absolutized and normalized
+if [ -z "$END_TO_END_DIR" ] ; then
+    # error; for some reason, the path is not accessible
+    # to the script (e.g. permissions re-evaled after suid)
+    exit 1  # fail
+fi
+
+export END_TO_END_DIR
+
+if [ -z "$FLINK_DIR" ] ; then
+    echo "You have to export the Flink distribution directory as FLINK_DIR"
+    exit 1
+fi
+
+source "${END_TO_END_DIR}/test-scripts/test-runner-common.sh"
+
+FLINK_DIR="`( cd \"$FLINK_DIR\" && pwd )`" # absolutized and normalized
+
+echo "flink-end-to-end-test directory: $END_TO_END_DIR"
+echo "Flink distribution directory: $FLINK_DIR"
+
+run_test "$*" "$*"
+
+exit 0


[6/7] flink git commit: [FLINK-9394] [e2e] Test rescaling when resuming from externalized checkpoints

Posted by tz...@apache.org.
[FLINK-9394] [e2e] Test rescaling when resuming from externalized checkpoints

This closes #6038.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc9ccbbc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc9ccbbc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc9ccbbc

Branch: refs/heads/master
Commit: cc9ccbbc265774947a08e8a07a63449172a79ab5
Parents: cacde0f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 18 00:33:12 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:57 2018 +0200

----------------------------------------------------------------------
 flink-end-to-end-tests/run-nightly-tests.sh     | 24 ++++++---
 .../test_resume_externalized_checkpoints.sh     | 56 +++++++++++++++-----
 2 files changed, 60 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc9ccbbc/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index c4c5069..302e27f 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -61,13 +61,23 @@ run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end test" "$E
 run_test "Resuming Savepoint (rocks, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks"
 run_test "Resuming Savepoint (rocks, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks"
 
-run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true"
-run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false"
-run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks"
-
-run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true true"
-run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false true"
-run_test "Resuming Externalized Checkpoint after terminal failure (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks true"
+run_test "Resuming Externalized Checkpoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file false true"
+run_test "Resuming Externalized Checkpoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file false true"
+run_test "Resuming Externalized Checkpoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file false true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true true"
+
+run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true true"
 
 run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
 run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh"

http://git-wip-us.apache.org/repos/asf/flink/blob/cc9ccbbc/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index 6472b23..29c3786 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -17,13 +17,28 @@
 # limitations under the License.
 ################################################################################
 
+if [ -z $1 ] || [ -z $2 ]; then
+ echo "Usage: ./test_resume_externalized_checkpoints.sh <original_dop> <new_dop> <state_backend_setting> <state_backend_file_async_setting> <state_backend_rocks_incremental_setting>"
+ exit 1
+fi
+
 source "$(dirname "$0")"/common.sh
 
-STATE_BACKEND_TYPE=${1:-file}
-STATE_BACKEND_FILE_ASYNC=${2:-true}
-SIMULATE_FAILURE=${3:-false}
+ORIGINAL_DOP=$1
+NEW_DOP=$2
+STATE_BACKEND_TYPE=${3:-file}
+STATE_BACKEND_FILE_ASYNC=${4:-true}
+STATE_BACKEND_ROCKS_INCREMENTAL=${5:-false}
+SIMULATE_FAILURE=${6:-false}
+
+if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+ NUM_SLOTS=$ORIGINAL_DOP
+else
+ NUM_SLOTS=$NEW_DOP
+fi
 
 backup_config
+change_conf "taskmanager.numberOfTaskSlots" "1" "${NUM_SLOTS}"
 setup_flink_slf4j_metric_reporter
 start_cluster
 
@@ -43,18 +58,30 @@ CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"
 
 # run the DataStream allroundjob
 
-echo "Running externalized checkpoints test, with STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC SIMULATE_FAILURE=$SIMULATE_FAILURE ..."
+echo "Running externalized checkpoints test, \
+with ORIGINAL_DOP=$ORIGINAL_DOP NEW_DOP=$NEW_DOP \
+and STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC \
+STATE_BACKEND_ROCKSDB_INCREMENTAL=$STATE_BACKEND_ROCKS_INCREMENTAL SIMULATE_FAILURE=$SIMULATE_FAILURE ..."
 
 TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-BASE_JOB_CMD="$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
-  --test.semantics exactly-once \
-  --environment.externalize_checkpoint true \
-  --environment.externalize_checkpoint.cleanup retain \
-  --state_backend $STATE_BACKEND_TYPE \
-  --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
-  --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
-  --sequence_generator_source.sleep_time 15 \
-  --sequence_generator_source.sleep_after_elements 1"
+
+function buildBaseJobCmd {
+  local dop=$1
+
+  echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \
+    --test.semantics exactly-once \
+    --environment.parallelism $dop \
+    --environment.externalize_checkpoint true \
+    --environment.externalize_checkpoint.cleanup retain \
+    --state_backend $STATE_BACKEND_TYPE \
+    --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
+    --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+    --state_backend.rocks.incremental $STATE_BACKEND_ROCKS_INCREMENTAL \
+    --sequence_generator_source.sleep_time 15 \
+    --sequence_generator_source.sleep_after_elements 1"
+}
+
+BASE_JOB_CMD=`buildBaseJobCmd $ORIGINAL_DOP`
 
 JOB_CMD=""
 if [[ $SIMULATE_FAILURE == "true" ]]; then
@@ -98,6 +125,9 @@ if (( $NUM_CHECKPOINTS > 1 )); then
 fi
 
 echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
+
+BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP`
+
 DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
 wait_job_running $DATASTREAM_JOB


[3/7] flink git commit: [FLINK-8795] Fixed local scala shell for Flip6

Posted by tz...@apache.org.
[FLINK-8795] Fixed local scala shell for Flip6

This closes #6182.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1556aa93
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1556aa93
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1556aa93

Branch: refs/heads/master
Commit: 1556aa9359f11da00892420bad737f0597d08154
Parents: ef9e837
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Tue Jun 19 09:49:31 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:57 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/scala/FlinkShell.scala | 36 +++++++---
 .../flink/api/scala/ScalaShellITCase.scala      | 72 +++++++++++++-------
 .../scala/ScalaShellLocalStartupITCase.scala    | 14 +++-
 3 files changed, 87 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1556aa93/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index dbdc052..b74a8a0 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -23,9 +23,9 @@ import java.io._
 import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser}
 import org.apache.flink.client.deployment.ClusterDescriptor
 import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.configuration.{Configuration, GlobalConfiguration, JobManagerOptions}
+import org.apache.flink.configuration.{Configuration, CoreOptions, GlobalConfiguration, JobManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
+import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.tools.nsc.Settings
@@ -137,20 +137,36 @@ object FlinkShell {
     }
   }
 
+  private type LocalCluster = Either[StandaloneMiniCluster, MiniCluster]
+
   def fetchConnectionInfo(
     configuration: Configuration,
     config: Config
-  ): (String, Int, Option[Either[StandaloneMiniCluster, ClusterClient[_]]]) = {
+  ): (String, Int, Option[Either[LocalCluster , ClusterClient[_]]]) = {
     config.executionMode match {
       case ExecutionMode.LOCAL => // Local mode
         val config = configuration
         config.setInteger(JobManagerOptions.PORT, 0)
 
-        val miniCluster = new StandaloneMiniCluster(config)
+        val (miniCluster, port) = config.getString(CoreOptions.MODE) match {
+          case CoreOptions.LEGACY_MODE => {
+            val cluster = new StandaloneMiniCluster(config)
+
+            (Left(cluster), cluster.getPort)
+          }
+          case CoreOptions.NEW_MODE => {
+            val miniClusterConfig = new MiniClusterConfiguration.Builder()
+              .setConfiguration(config)
+              .build()
+            val cluster = new MiniCluster(miniClusterConfig)
+            cluster.start()
+
+            (Right(cluster), cluster.getRestAddress.getPort)
+          }
+        }
 
-        println("\nStarting local Flink cluster (host: localhost, " +
-          s"port: ${miniCluster.getPort}).\n")
-        ("localhost", miniCluster.getPort, Some(Left(miniCluster)))
+        println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
+        ("localhost", port, Some(Left(miniCluster)))
 
       case ExecutionMode.REMOTE => // Remote mode
         if (config.host.isEmpty || config.port.isEmpty) {
@@ -193,7 +209,8 @@ object FlinkShell {
     val (repl, cluster) = try {
       val (host, port, cluster) = fetchConnectionInfo(configuration, config)
       val conf = cluster match {
-        case Some(Left(miniCluster)) => miniCluster.getConfiguration
+        case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
+        case Some(Left(Right(_))) => configuration
         case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
         case None => configuration
       }
@@ -223,7 +240,8 @@ object FlinkShell {
     } finally {
       repl.closeInterpreter()
       cluster match {
-        case Some(Left(miniCluster)) => miniCluster.close()
+        case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
+        case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
         case Some(Right(yarnCluster)) => yarnCluster.shutdown()
         case _ =>
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/1556aa93/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 2e07fb9..12522a8 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -19,19 +19,16 @@
 package org.apache.flink.api.scala
 
 import java.io._
+import java.util.Objects
 
-import akka.actor.ActorRef
-import akka.pattern.Patterns
-import org.apache.flink.configuration.{Configuration, CoreOptions, TaskManagerOptions}
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
+import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
+import org.apache.flink.test.util.MiniClusterResource
 import org.apache.flink.util.TestLogger
-import org.junit.rules.TemporaryFolder
 import org.junit._
+import org.junit.rules.TemporaryFolder
 
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration.FiniteDuration
 import scala.tools.nsc.Settings
 
 class ScalaShellITCase extends TestLogger {
@@ -280,11 +277,11 @@ class ScalaShellITCase extends TestLogger {
     BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml"))
 
     val args = cluster match {
-      case Some(cl) =>
+      case Some(_) =>
         Array(
           "remote",
-          cl.getHostname,
-          Integer.toString(cl.getPort),
+          hostname,
+          Integer.toString(port),
           "--configDir",
           dir.getAbsolutePath)
       case None => throw new IllegalStateException("Cluster has not been started.")
@@ -314,17 +311,43 @@ class ScalaShellITCase extends TestLogger {
 }
 
 object ScalaShellITCase {
-  var cluster: Option[StandaloneMiniCluster] = None
 
-  val parallelism = 4
   val configuration = new Configuration()
+  var cluster: Option[Either[MiniCluster, StandaloneMiniCluster]] = None
+
+  var port: Int = _
+  var hostname : String = _
+  val parallelism: Int = 4
 
   @BeforeClass
   def beforeAll(): Unit = {
-    configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
-    configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
-
-    cluster = Option(new StandaloneMiniCluster(configuration))
+    val isNew = Objects.equals(MiniClusterResource.NEW_CODEBASE,
+      System.getProperty(MiniClusterResource.CODEBASE_KEY))
+    if (isNew) {
+      configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+      // set to different than default so not to interfere with ScalaShellLocalStartupITCase
+      configuration.setInteger(RestOptions.PORT, 8082)
+      val miniConfig = new MiniClusterConfiguration.Builder()
+        .setConfiguration(configuration)
+        .setNumSlotsPerTaskManager(parallelism)
+        .build()
+
+      val miniCluster = new MiniCluster(miniConfig)
+      miniCluster.start()
+      port = miniCluster.getRestAddress.getPort
+      hostname = miniCluster.getRestAddress.getHost
+
+      cluster = Some(Left(miniCluster))
+    } else {
+      configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
+      configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
+      val standaloneCluster = new StandaloneMiniCluster(configuration)
+
+      hostname = standaloneCluster.getHostname
+      port = standaloneCluster.getPort
+
+      cluster = Some(Right(standaloneCluster))
+    }
   }
 
   @AfterClass
@@ -332,7 +355,10 @@ object ScalaShellITCase {
     // The Scala interpreter somehow changes the class loader. Therefore, we have to reset it
     Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
 
-    cluster.foreach(_.close)
+    cluster.foreach {
+      case Left(miniCluster) => miniCluster.close()
+      case Right(miniCluster) => miniCluster.close()
+    }
   }
 
   /**
@@ -350,18 +376,18 @@ object ScalaShellITCase {
     System.setOut(new PrintStream(baos))
 
     cluster match {
-      case Some(cl) =>
+      case Some(_) =>
         val repl = externalJars match {
           case Some(ej) => new FlinkILoop(
-            cl.getHostname,
-            cl.getPort,
+            hostname,
+            port,
             configuration,
             Option(Array(ej)),
             in, new PrintWriter(out))
 
           case None => new FlinkILoop(
-            cl.getHostname,
-            cl.getPort,
+            hostname,
+            port,
             configuration,
             in, new PrintWriter(out))
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/1556aa93/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index 9365948..a971db8 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -19,12 +19,14 @@
 package org.apache.flink.api.scala
 
 import java.io._
+import java.util.Objects
 
 import org.apache.flink.configuration.{Configuration, CoreOptions}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
+import org.apache.flink.test.util.MiniClusterResource
 import org.apache.flink.util.TestLogger
-import org.junit.{Assert, Rule, Test}
 import org.junit.rules.TemporaryFolder
+import org.junit.{Assert, Rule, Test}
 
 class ScalaShellLocalStartupITCase extends TestLogger {
 
@@ -85,7 +87,13 @@ class ScalaShellLocalStartupITCase extends TestLogger {
     System.setOut(new PrintStream(baos))
 
     val configuration = new Configuration()
-    configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
+    val mode = if (Objects.equals(MiniClusterResource.NEW_CODEBASE,
+      System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+      CoreOptions.NEW_MODE
+    } else {
+      CoreOptions.LEGACY_MODE
+    }
+    configuration.setString(CoreOptions.MODE, mode)
 
     val dir = temporaryFolder.newFolder()
     BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml"))
@@ -93,7 +101,7 @@ class ScalaShellLocalStartupITCase extends TestLogger {
     val args: Array[String] = Array("local", "--configDir", dir.getAbsolutePath)
 
     //start flink scala shell
-    FlinkShell.bufferedReader = Some(in);
+    FlinkShell.bufferedReader = Some(in)
     FlinkShell.main(args)
 
     baos.flush()


[5/7] flink git commit: [hotfix] [docs] Fix a typo in index.md

Posted by tz...@apache.org.
[hotfix] [docs] Fix a typo in index.md

This closes #6193.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cacde0f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cacde0f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cacde0f3

Branch: refs/heads/master
Commit: cacde0f3f567302d933ebf09c929d73e0c5c9d56
Parents: 1556aa9
Author: MichealShin <lu...@126.com>
Authored: Thu Jun 21 16:50:34 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:57 2018 +0200

----------------------------------------------------------------------
 docs/index.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cacde0f3/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 78ee7d0..419bb39 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -31,7 +31,7 @@ Apache Flink is an open source platform for distributed stream and batch data pr
 
 ## First Steps
 
-- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommended you read these sections first.
+- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommend you read these sections first.
 
 - **Quickstarts**: [Run an example program](quickstart/setup_quickstart.html) on your local machine or [study some examples](examples/index.html).
 


[7/7] flink git commit: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer backpressuring

Posted by tz...@apache.org.
[FLINK-9374] [kinesis] Enable FlinkKinesisProducer backpressuring

This closes #9374.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d034d4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d034d4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d034d4e

Branch: refs/heads/master
Commit: 7d034d4ef6986ba5ccda6f5e8c587b8fdd88be8e
Parents: f1f79ee
Author: Franz Thoma <fr...@tngtech.com>
Authored: Wed May 9 08:27:47 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:58 2018 +0200

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  | 30 +++++++
 .../kinesis/FlinkKinesisProducer.java           | 70 ++++++++++++++-
 .../connectors/kinesis/util/TimeoutLatch.java   | 44 +++++++++
 .../kinesis/FlinkKinesisProducerTest.java       | 95 +++++++++++++++-----
 .../flink/core/testutils/CheckedThread.java     | 13 ++-
 .../flink/core/testutils/MultiShotLatch.java    |  9 ++
 6 files changed, 239 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 7551142..03224a1 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -344,6 +344,36 @@ Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL fr
 
 Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example.
 
+### Backpressure
+
+By default, `FlinkKinesisProducer` does not backpressure. Instead, records that
+cannot be sent because of the rate restriction of 1 MB per second per shard are
+buffered in an unbounded queue and dropped when their `RecordTtl` expires.
+
+To avoid data loss, you can enable backpressuring by restricting the size of the
+internal queue:
+
+```
+// 200 Bytes per record, 1 shard
+kinesis.setQueueLimit(500);
+```
+
+The value for `queueLimit` depends on the expected record size. To choose a good
+value, consider that Kinesis is rate-limited to 1MB per second per shard. If
+less than one second's worth of records is buffered, then the queue may not be
+able to operate at full capacity. With the default `RecordMaxBufferedTime` of
+100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit`
+can then be computed via
+
+```
+queue limit = (number of shards * queue size per shard) / record size
+```
+
+E.g. for 200Bytes per record and 8 shards, a queue limit of 4000 is a good
+starting point. If the queue size limits throughput (below 1MB per second per
+shard), try increasing the queue limit slightly.
+
+
 ## Using Non-AWS Kinesis Endpoints for Testing
 
 It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as

http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index a9b48ae..b086ac1 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -21,12 +21,15 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.TimeoutLatch;
 import org.apache.flink.util.InstantiationUtil;
 
 import com.amazonaws.services.kinesis.producer.Attempt;
@@ -55,6 +58,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @PublicEvolving
 public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
 
+	public static final String KINESIS_PRODUCER_METRIC_GROUP = "kinesisProducer";
+
+	public static final String METRIC_BACKPRESSURE_CYCLES = "backpressureCycles";
+
+	public static final String METRIC_OUTSTANDING_RECORDS_COUNT = "outstandingRecordsCount";
+
 	private static final long serialVersionUID = 6447077318449477846L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
@@ -65,6 +74,9 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 	/* Flag controlling the error behavior of the producer */
 	private boolean failOnError = false;
 
+	/* Maximum length of the internal record queue before backpressuring */
+	private int queueLimit = Integer.MAX_VALUE;
+
 	/* Name of the default stream to produce to. Can be overwritten by the serialization schema */
 	private String defaultStream;
 
@@ -82,9 +94,15 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 	/* Our Kinesis instance for each parallel Flink sink */
 	private transient KinesisProducer producer;
 
+	/* Backpressuring waits for this latch, triggered by record callback */
+	private transient TimeoutLatch backpressureLatch;
+
 	/* Callback handling failures */
 	private transient FutureCallback<UserRecordResult> callback;
 
+	/* Counts how often we have to wait for KPL because we are above the queue limit */
+	private transient Counter backpressureCycles;
+
 	/* Field for async exception */
 	private transient volatile Throwable thrownException;
 
@@ -145,6 +163,18 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 	}
 
 	/**
+	 * The {@link KinesisProducer} holds an unbounded queue internally. To avoid memory
+	 * problems under high loads, a limit can be employed above which the internal queue
+	 * will be flushed, thereby applying backpressure.
+	 *
+	 * @param queueLimit The maximum length of the internal queue before backpressuring
+	 */
+	public void setQueueLimit(int queueLimit) {
+		checkArgument(queueLimit > 0, "queueLimit must be a positive number");
+		this.queueLimit = queueLimit;
+	}
+
+	/**
 	 * Set a default stream name.
 	 * @param defaultStream Name of the default Kinesis stream
 	 */
@@ -180,9 +210,16 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
 
 		producer = getKinesisProducer(producerConfig);
+
+		final MetricGroup kinesisMectricGroup = getRuntimeContext().getMetricGroup().addGroup(KINESIS_PRODUCER_METRIC_GROUP);
+		this.backpressureCycles = kinesisMectricGroup.counter(METRIC_BACKPRESSURE_CYCLES);
+		kinesisMectricGroup.gauge(METRIC_OUTSTANDING_RECORDS_COUNT, producer::getOutstandingRecordsCount);
+
+		backpressureLatch = new TimeoutLatch();
 		callback = new FutureCallback<UserRecordResult>() {
 			@Override
 			public void onSuccess(UserRecordResult result) {
+				backpressureLatch.trigger();
 				if (!result.isSuccessful()) {
 					if (failOnError) {
 						// only remember the first thrown exception
@@ -197,6 +234,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 
 			@Override
 			public void onFailure(Throwable t) {
+				backpressureLatch.trigger();
 				if (failOnError) {
 					thrownException = t;
 				} else {
@@ -219,6 +257,11 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 		}
 
 		checkAndPropagateAsyncError();
+		boolean didWaitForFlush = enforceQueueLimit();
+
+		if (didWaitForFlush) {
+			checkAndPropagateAsyncError();
+		}
 
 		String stream = defaultStream;
 		String partition = defaultPartition;
@@ -327,6 +370,32 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 	}
 
 	/**
+	 * If the internal queue of the {@link KinesisProducer} gets too long,
+	 * flush some of the records until we are below the limit again.
+	 * We don't want to flush _all_ records at this point since that would
+	 * break record aggregation.
+	 *
+	 * @return boolean whether flushing occurred or not
+	 */
+	private boolean enforceQueueLimit() {
+		int attempt = 0;
+		while (producer.getOutstandingRecordsCount() >= queueLimit) {
+			backpressureCycles.inc();
+			if (attempt >= 10) {
+				LOG.warn("Waiting for the queue length to drop below the limit takes unusually long, still not done after {} attempts.", attempt);
+			}
+			attempt++;
+			try {
+				backpressureLatch.await(100);
+			} catch (InterruptedException e) {
+				LOG.warn("Flushing was interrupted.");
+				break;
+			}
+		}
+		return attempt > 0;
+	}
+
+	/**
 	 * A reimplementation of {@link KinesisProducer#flushSync()}.
 	 * This implementation releases the block on flushing if an interruption occurred.
 	 */
@@ -337,7 +406,6 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
 				Thread.sleep(500);
 			} catch (InterruptedException e) {
 				LOG.warn("Flushing was interrupted.");
-
 				break;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
new file mode 100644
index 0000000..4dcab33
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.Internal;
+
+@Internal
+public class TimeoutLatch {
+
+	private final Object lock = new Object();
+	private volatile boolean waiting;
+
+	public void await(long timeout) throws InterruptedException {
+		synchronized (lock) {
+			waiting = true;
+			lock.wait(timeout);
+		}
+	}
+
+	public void trigger() {
+		if (waiting) {
+			synchronized (lock) {
+				waiting = false;
+				lock.notifyAll();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 86cefff..6355cf5 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -46,6 +46,7 @@ import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -267,6 +268,79 @@ public class FlinkKinesisProducerTest {
 		testHarness.close();
 	}
 
+	/**
+	 * Test ensuring that the producer blocks if the queue limit is exceeded,
+	 * until the queue length drops below the limit;
+	 * we set a timeout because the test will not finish if the logic is broken.
+	 */
+	@Test(timeout = 10000)
+	public void testBackpressure() throws Throwable {
+		final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+		producer.setQueueLimit(1);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+				new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+		testHarness.open();
+
+		UserRecordResult result = mock(UserRecordResult.class);
+		when(result.isSuccessful()).thenReturn(true);
+
+		CheckedThread msg1 = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				testHarness.processElement(new StreamRecord<>("msg-1"));
+			}
+		};
+		msg1.start();
+		msg1.trySync(100);
+		assertFalse("Flush triggered before reaching queue limit", msg1.isAlive());
+
+		// consume msg-1 so that queue is empty again
+		producer.getPendingRecordFutures().get(0).set(result);
+
+		CheckedThread msg2 = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				testHarness.processElement(new StreamRecord<>("msg-2"));
+			}
+		};
+		msg2.start();
+		msg2.trySync(100);
+		assertFalse("Flush triggered before reaching queue limit", msg2.isAlive());
+
+		CheckedThread moreElementsThread = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				// this should block until msg-2 is consumed
+				testHarness.processElement(new StreamRecord<>("msg-3"));
+				// this should block until msg-3 is consumed
+				testHarness.processElement(new StreamRecord<>("msg-4"));
+			}
+		};
+		moreElementsThread.start();
+
+		moreElementsThread.trySync(100);
+		assertTrue("Producer should still block, but doesn't", moreElementsThread.isAlive());
+
+		// consume msg-2 from the queue, leaving msg-3 in the queue and msg-4 blocked
+		producer.getPendingRecordFutures().get(1).set(result);
+
+		moreElementsThread.trySync(100);
+		assertTrue("Producer should still block, but doesn't", moreElementsThread.isAlive());
+
+		// consume msg-3, blocked msg-4 can be inserted into the queue and block is released
+		producer.getPendingRecordFutures().get(2).set(result);
+
+		moreElementsThread.trySync(100);
+
+		assertFalse("Prodcuer still blocks although the queue is flushed", moreElementsThread.isAlive());
+
+		producer.getPendingRecordFutures().get(3).set(result);
+
+		testHarness.close();
+	}
+
 	// ----------------------------------------------------------------------
 	// Utility test classes
 	// ----------------------------------------------------------------------
@@ -346,7 +420,6 @@ public class FlinkKinesisProducerTest {
 		private List<SettableFuture<UserRecordResult>> pendingRecordFutures = new LinkedList<>();
 
 		private transient MultiShotLatch flushLatch;
-		private boolean isFlushed;
 
 		DummyFlinkKinesisProducer(SerializationSchema<T> schema) {
 			super(schema, TestUtils.getStandardProperties());
@@ -378,13 +451,6 @@ public class FlinkKinesisProducerTest {
 				@Override
 				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
 					flushLatch.trigger();
-
-					while (!isAllRecordFuturesCompleted()) {
-						Thread.sleep(50);
-					}
-
-					isFlushed = true;
-
 					return null;
 				}
 			}).when(mockProducer).flush();
@@ -399,12 +465,11 @@ public class FlinkKinesisProducerTest {
 
 		@Override
 		public void snapshotState(FunctionSnapshotContext context) throws Exception {
-			isFlushed = false;
 
 			super.snapshotState(context);
 
 			// if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
-			if (!isFlushed) {
+			if (mockProducer.getOutstandingRecordsCount() > 0) {
 				throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
 			}
 		}
@@ -417,16 +482,6 @@ public class FlinkKinesisProducerTest {
 			flushLatch.await();
 		}
 
-		private boolean isAllRecordFuturesCompleted() {
-			for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
-				if (!future.isDone()) {
-					return false;
-				}
-			}
-
-			return true;
-		}
-
 		private int getNumPendingRecordFutures() {
 			int numPending = 0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
index f2647cc..7aa177a 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -97,8 +97,19 @@ public abstract class CheckedThread extends Thread {
 	 * exceptions thrown from the {@link #go()} method.
 	 */
 	public void sync(long timeout) throws Exception {
-		join(timeout);
+		trySync(timeout);
 		checkFinished();
+	}
+
+	/**
+	 * Waits with timeout until the thread is completed and checks whether any error
+	 * occurred during the execution.
+	 *
+	 * <p>This method blocks like {@link #join()}, but performs an additional check for
+	 * exceptions thrown from the {@link #go()} method.
+	 */
+	public void trySync(long timeout) throws Exception {
+		join(timeout);
 		checkError();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
index 69f73eb..861ea31 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
@@ -52,4 +52,13 @@ public final class MultiShotLatch {
 			triggered = false;
 		}
 	}
+
+	/**
+	 * Checks if the latch was triggered.
+	 *
+	 * @return True, if the latch was triggered, false if not.
+	 */
+	public boolean isTriggered() {
+		return triggered;
+	}
 }


[4/7] flink git commit: [FLINK-8944] [Kinesis Connector] Use listShards instead of DescribeStream for shard discovery as it offer higher rate limits

Posted by tz...@apache.org.
[FLINK-8944] [Kinesis Connector] Use listShards instead of DescribeStream for shard discovery as it offer higher rate limits

This closes #5992


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef9e8378
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef9e8378
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef9e8378

Branch: refs/heads/master
Commit: ef9e8378929b2f068c71262ed93702da0b023b04
Parents: ae8cef3
Author: Kailash HD <kd...@lyft.com>
Authored: Mon Mar 26 09:42:25 2018 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:57 2018 +0200

----------------------------------------------------------------------
 .../kinesis/config/ConsumerConfigConstants.java |  36 +++++
 .../connectors/kinesis/proxy/KinesisProxy.java  | 134 +++++++++--------
 .../kinesis/util/KinesisConfigUtil.java         |  35 ++++-
 .../kinesis/proxy/KinesisProxyTest.java         | 143 +++++++++++++++++++
 .../kinesis/util/KinesisConfigUtilTest.java     |  18 +--
 5 files changed, 295 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef9e8378/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 38a0e3d..e46f79e 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -65,15 +65,42 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 	/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
 	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
 
+	/**
+	 * Deprecated key.
+	 *
+	 * @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_BASE} instead
+	 **/
+	@Deprecated
 	/** The base backoff time between each describeStream attempt. */
 	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
 
+	/**
+	 * Deprecated key.
+	 *
+	 * @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_MAX} instead
+	 **/
+	@Deprecated
 	/** The maximum backoff time between each describeStream attempt. */
 	public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";
 
+	/**
+	 * Deprecated key.
+	 *
+	 * @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT} instead
+	 **/
+	@Deprecated
 	/** The power constant for exponential backoff between each describeStream attempt. */
 	public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";
 
+	/** The base backoff time between each listShards attempt. */
+	public static final String LIST_SHARDS_BACKOFF_BASE = "flink.list.shards.backoff.base";
+
+	/** The maximum backoff time between each listShards attempt. */
+	public static final String LIST_SHARDS_BACKOFF_MAX = "flink.list.shards.backoff.max";
+
+	/** The power constant for exponential backoff between each listShards attempt. */
+	public static final String LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.list.shards.backoff.expconst";
+
 	/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
 	public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
 
@@ -115,12 +142,21 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
 
+	@Deprecated
 	public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
 
+	@Deprecated
 	public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
 
+	@Deprecated
 	public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
+	public static final long DEFAULT_LIST_SHARDS_BACKOFF_BASE = 1000L;
+
+	public static final long DEFAULT_LIST_SHARDS_BACKOFF_MAX = 5000L;
+
+	public static final double DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
 	public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000;
 
 	public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;

http://git-wip-us.apache.org/repos/asf/flink/blob/ef9e8378/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 09e9d4c..7e6a360 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -21,24 +21,27 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.SdkClientException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ExpiredNextTokenException;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.InvalidArgumentException;
 import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.ResourceInUseException;
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import com.amazonaws.services.kinesis.model.StreamStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,17 +79,17 @@ public class KinesisProxy implements KinesisProxyInterface {
 	private static final Random seed = new Random();
 
 	// ------------------------------------------------------------------------
-	//  describeStream() related performance settings
+	//  listShards() related performance settings
 	// ------------------------------------------------------------------------
 
-	/** Base backoff millis for the describe stream operation. */
-	private final long describeStreamBaseBackoffMillis;
+	/** Base backoff millis for the list shards operation. */
+	private final long listShardsBaseBackoffMillis;
 
-	/** Maximum backoff millis for the describe stream operation. */
-	private final long describeStreamMaxBackoffMillis;
+	/** Maximum backoff millis for the list shards operation. */
+	private final long listShardsMaxBackoffMillis;
 
-	/** Exponential backoff power constant for the describe stream operation. */
-	private final double describeStreamExpConstant;
+	/** Exponential backoff power constant for the list shards operation. */
+	private final double listShardsExpConstant;
 
 	// ------------------------------------------------------------------------
 	//  getRecords() related performance settings
@@ -127,21 +130,22 @@ public class KinesisProxy implements KinesisProxyInterface {
 	 */
 	protected KinesisProxy(Properties configProps) {
 		checkNotNull(configProps);
+		KinesisConfigUtil.replaceDeprecatedConsumerKeys(configProps);
 
 		this.kinesisClient = createKinesisClient(configProps);
 
-		this.describeStreamBaseBackoffMillis = Long.valueOf(
+		this.listShardsBaseBackoffMillis = Long.valueOf(
 			configProps.getProperty(
-				ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
-				Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
-		this.describeStreamMaxBackoffMillis = Long.valueOf(
+				ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_BASE)));
+		this.listShardsMaxBackoffMillis = Long.valueOf(
 			configProps.getProperty(
-				ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
-				Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
-		this.describeStreamExpConstant = Double.valueOf(
+				ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_MAX)));
+		this.listShardsExpConstant = Double.valueOf(
 			configProps.getProperty(
-				ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
-				Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
+				ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
 
 		this.getRecordsBaseBackoffMillis = Long.valueOf(
 			configProps.getProperty(
@@ -353,19 +357,24 @@ public class KinesisProxy implements KinesisProxyInterface {
 	private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
 		List<StreamShardHandle> shardsOfStream = new ArrayList<>();
 
-		DescribeStreamResult describeStreamResult;
+		// List Shards returns just the first 1000 shard entries. In order to read the entire stream,
+		// we need to use the returned nextToken to get additional shards.
+		ListShardsResult listShardsResult;
+		String startShardToken = null;
 		do {
-			describeStreamResult = describeStream(streamName, lastSeenShardId);
-
-			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
+			listShardsResult = listShards(streamName, lastSeenShardId, startShardToken);
+			if (listShardsResult == null) {
+				// In case we have exceptions while retrieving all shards, ensure that incomplete shard list is not returned.
+				// Hence clearing the incomplete shard list before returning it.
+				shardsOfStream.clear();
+				return shardsOfStream;
+			}
+			List<Shard> shards = listShardsResult.getShards();
 			for (Shard shard : shards) {
 				shardsOfStream.add(new StreamShardHandle(streamName, shard));
 			}
-
-			if (shards.size() != 0) {
-				lastSeenShardId = shards.get(shards.size() - 1).getShardId();
-			}
-		} while (describeStreamResult.getStreamDescription().isHasMoreShards());
+			startShardToken = listShardsResult.getNextToken();
+		} while (startShardToken != null);
 
 		return shardsOfStream;
 	}
@@ -382,41 +391,55 @@ public class KinesisProxy implements KinesisProxyInterface {
 	 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
 	 * @return the result of the describe stream operation
 	 */
-	private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
-		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
-		describeStreamRequest.setStreamName(streamName);
-		describeStreamRequest.setExclusiveStartShardId(startShardId);
+	private ListShardsResult listShards(String streamName, @Nullable String startShardId,
+																			@Nullable String startNextToken)
+			throws InterruptedException {
+		final ListShardsRequest listShardsRequest = new ListShardsRequest();
+		if (startNextToken == null) {
+			listShardsRequest.setExclusiveStartShardId(startShardId);
+			listShardsRequest.setStreamName(streamName);
+		} else {
+			// Note the nextToken returned by AWS expires within 300 sec.
+			listShardsRequest.setNextToken(startNextToken);
+		}
 
-		DescribeStreamResult describeStreamResult = null;
+		ListShardsResult listShardsResults = null;
 
-		// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
+		// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
 		int attemptCount = 0;
-		while (describeStreamResult == null) { // retry until we get a result
+		// List Shards returns just the first 1000 shard entries. Make sure that all entries
+		// are taken up.
+		while (listShardsResults == null) { // retry until we get a result
 			try {
-				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
+
+				listShardsResults = kinesisClient.listShards(listShardsRequest);
 			} catch (LimitExceededException le) {
 				long backoffMillis = fullJitterBackoff(
-					describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-				LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
-					+ backoffMillis + " millis.");
+						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+					LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
+									+ ". Backing off for " + backoffMillis + " millis.");
 				Thread.sleep(backoffMillis);
-			} catch (ResourceNotFoundException re) {
-				throw new RuntimeException("Error while getting stream details", re);
-			}
-		}
-
-		String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
-		if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
-			if (LOG.isWarnEnabled()) {
-				LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
-					"describeStream operation will not contain any shard information.");
+			} catch (ResourceInUseException reInUse) {
+				if (LOG.isWarnEnabled()) {
+					// List Shards will throw an exception if stream in not in active state. Return and re-use previous state available.
+					LOG.info("The stream is currently not in active state. Reusing the older state "
+							+ "for the time being");
+					break;
+				}
+			} catch (ResourceNotFoundException reNotFound) {
+				throw new RuntimeException("Stream not found. Error while getting shard list.", reNotFound);
+			} catch (InvalidArgumentException inArg) {
+				throw new RuntimeException("Invalid Arguments to listShards.", inArg);
+			} catch (ExpiredNextTokenException expiredToken) {
+				LOG.warn("List Shards has an expired token. Reusing the previous state.");
+				break;
 			}
 		}
-
-		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
-		// start shard id in the returned shards list; check if we need to remove these erroneously returned shards
-		if (startShardId != null) {
-			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
+		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
+		// the exclusive start shard id in the returned shards list; check if we need to remove
+		// these erroneously returned shards.
+		if (startShardId != null && listShardsResults != null) {
+			List<Shard> shards = listShardsResults.getShards();
 			Iterator<Shard> shardItr = shards.iterator();
 			while (shardItr.hasNext()) {
 				if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
@@ -424,8 +447,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 				}
 			}
 		}
-
-		return describeStreamResult;
+		return listShardsResults;
 	}
 
 	protected static long fullJitterBackoff(long base, long max, double power, int attempt) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ef9e8378/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index dc8b79b..a4d60ed 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -28,9 +28,13 @@ import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConsta
 
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -73,6 +77,8 @@ public class KinesisConfigUtil {
 	/** Default values for ThreadPoolSize. **/
 	protected static final int DEFAULT_THREAD_POOL_SIZE = 10;
 
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisConfigUtil.class);
+
 	/**
 	 * Validate configuration properties for {@link FlinkKinesisConsumer}.
 	 */
@@ -142,14 +148,14 @@ public class KinesisConfigUtil {
 		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
 			"Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value.");
 
-		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
-			"Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value.");
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
+			"Invalid value given for list shards operation base backoff milliseconds. Must be a valid non-negative long value.");
 
-		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
-			"Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value.");
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
+			"Invalid value given for list shards operation max backoff milliseconds. Must be a valid non-negative long value.");
 
-		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
-			"Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value.");
+		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for list shards operation backoff exponential constant. Must be a valid non-negative double value.");
 
 		if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
 			checkArgument(
@@ -181,6 +187,23 @@ public class KinesisConfigUtil {
 		return configProps;
 	}
 
+	public static Properties replaceDeprecatedConsumerKeys(Properties configProps) {
+		HashMap<String, String> deprecatedOldKeyToNewKeys = new HashMap<>();
+		deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE);
+		deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX);
+		deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT);
+		for (Map.Entry<String, String> entry : deprecatedOldKeyToNewKeys.entrySet()) {
+			String deprecatedOldKey = entry.getKey();
+			String newKey = entry.getValue();
+			if (configProps.containsKey(deprecatedOldKey)) {
+				LOG.warn("Please note {} property has been deprecated. Please use the {} new property key", deprecatedOldKey, newKey);
+				configProps.setProperty(newKey, configProps.getProperty(deprecatedOldKey));
+				configProps.remove(deprecatedOldKey);
+			}
+		}
+		return configProps;
+	}
+
 	/**
 	 * Validate configuration properties for {@link FlinkKinesisProducer},
 	 * and return a constructed KinesisProducerConfiguration.

http://git-wip-us.apache.org/repos/asf/flink/blob/ef9e8378/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 25f4381..775ae4b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -18,6 +18,9 @@
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 
 import com.amazonaws.AmazonServiceException;
@@ -26,15 +29,34 @@ import com.amazonaws.ClientConfiguration;
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test for methods in the {@link KinesisProxy} class.
@@ -70,6 +92,66 @@ public class KinesisProxyTest {
 	}
 
 	@Test
+	public void testGetShardList() throws Exception {
+		List<String> shardIds =
+				Arrays.asList(
+						"shardId-000000000000",
+						"shardId-000000000001",
+						"shardId-000000000002",
+						"shardId-000000000003");
+		String nextToken = "NextToken";
+		String fakeStreamName = "fake-stream";
+		List<Shard> shards = shardIds
+						.stream()
+						.map(shardId -> new Shard().withShardId(shardId))
+						.collect(Collectors.toList());
+		Properties kinesisConsumerConfig = new Properties();
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "fake_accesskey");
+		kinesisConsumerConfig.setProperty(
+				ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "fake_secretkey");
+		KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+		AmazonKinesis mockClient = mock(AmazonKinesis.class);
+		Whitebox.setInternalState(kinesisProxy, "kinesisClient", mockClient);
+
+		ListShardsResult responseWithMoreData =
+				new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(nextToken);
+		ListShardsResult responseFinal =
+				new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null);
+		doReturn(responseWithMoreData)
+				.when(mockClient)
+				.listShards(argThat(initialListShardsRequestMatcher()));
+		doReturn(responseFinal).
+						when(mockClient).
+						listShards(argThat(listShardsNextToken(nextToken)));
+		HashMap<String, String> streamHashMap =
+				createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName));
+		GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap);
+
+		Assert.assertEquals(shardListResult.hasRetrievedShards(), true);
+
+		Set<String> expectedStreams = new HashSet<>();
+		expectedStreams.add(fakeStreamName);
+		Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams);
+		List<StreamShardHandle> actualShardList =
+				shardListResult.getRetrievedShardListOfStream(fakeStreamName);
+		List<StreamShardHandle> expectedStreamShard = new ArrayList<>();
+		assertThat(actualShardList, hasSize(4));
+		for (int i = 0; i < 4; i++) {
+			StreamShardHandle shardHandle =
+					new StreamShardHandle(
+							fakeStreamName,
+							new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)));
+			expectedStreamShard.add(shardHandle);
+		}
+
+		Assert.assertThat(
+				actualShardList,
+				containsInAnyOrder(
+						expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()])));
+	}
+
+	@Test
 	public void testCustomConfigurationOverride() {
 		Properties configProps = new Properties();
 		configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
@@ -101,4 +183,65 @@ public class KinesisProxyTest {
 		assertEquals(9999, clientConfiguration.getSocketTimeout());
 	}
 
+	protected static HashMap<String, String>
+	createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) {
+		HashMap<String, String> initial = new HashMap<>();
+		for (String stream : streams) {
+			initial.put(stream, null);
+		}
+		return initial;
+	}
+
+	private static ListShardsRequestMatcher initialListShardsRequestMatcher() {
+		return new ListShardsRequestMatcher(null, null);
+	}
+
+	private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) {
+		return new ListShardsRequestMatcher(null, nextToken);
+	}
+
+	private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher<ListShardsRequest> {
+		private final String shardId;
+		private final String nextToken;
+
+		ListShardsRequestMatcher(String shardIdArg, String nextTokenArg) {
+			shardId = shardIdArg;
+			nextToken = nextTokenArg;
+		}
+
+		@Override
+		protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) {
+			if (shardId == null) {
+				if (listShardsRequest.getExclusiveStartShardId() != null) {
+					return false;
+				}
+			} else {
+				if (!shardId.equals(listShardsRequest.getExclusiveStartShardId())) {
+					return false;
+				}
+			}
+
+			if (listShardsRequest.getNextToken() != null) {
+				if (!(listShardsRequest.getStreamName() == null
+								&& listShardsRequest.getExclusiveStartShardId() == null)) {
+					return false;
+				}
+
+				if (!listShardsRequest.getNextToken().equals(nextToken)) {
+					return false;
+				}
+			} else {
+				return nextToken == null;
+			}
+			return true;
+		}
+
+		@Override
+		public void describeTo(final Description description) {
+			description
+							.appendText("A ListShardsRequest with a shardId: ")
+							.appendValue(shardId)
+							.appendText(" and empty nextToken");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef9e8378/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index 69ca58b..7d05783 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -333,34 +333,34 @@ public class KinesisConfigUtilTest {
 	}
 
 	@Test
-	public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
+	public void testUnparsableLongForListShardsBackoffBaseMillisInConfig() {
 		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
+		exception.expectMessage("Invalid value given for list shards operation base backoff milliseconds");
 
 		Properties testConfig = TestUtils.getStandardProperties();
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
+		testConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
-	public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() {
+	public void testUnparsableLongForListShardsBackoffMaxMillisInConfig() {
 		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
+		exception.expectMessage("Invalid value given for list shards operation max backoff milliseconds");
 
 		Properties testConfig = TestUtils.getStandardProperties();
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
+		testConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX, "unparsableLong");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
 
 	@Test
-	public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() {
+	public void testUnparsableDoubleForListShardsBackoffExponentialConstantInConfig() {
 		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
+		exception.expectMessage("Invalid value given for list shards operation backoff exponential constant");
 
 		Properties testConfig = TestUtils.getStandardProperties();
-		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
+		testConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}