You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/06/22 07:31:33 UTC
[1/7] flink git commit: [FLINK-9594][E2E tests] Improve docs for new
test runner
Repository: flink
Updated Branches:
refs/heads/master ae8cef3de -> 7d034d4ef
[FLINK-9594][E2E tests] Improve docs for new test runner
This closes #6172.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1f79ee8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1f79ee8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1f79ee8
Branch: refs/heads/master
Commit: f1f79ee85f3488a682dfb5bedb5a779365759bed
Parents: d80f263
Author: Florian Schmidt <fl...@icloud.com>
Authored: Fri Jun 15 11:13:39 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:57 2018 +0200
----------------------------------------------------------------------
flink-end-to-end-tests/README.md | 42 +++++++++++++++++++++++++++++++----
1 file changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f1f79ee8/flink-end-to-end-tests/README.md
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/README.md b/flink-end-to-end-tests/README.md
index 82d7e41..26dba3d 100644
--- a/flink-end-to-end-tests/README.md
+++ b/flink-end-to-end-tests/README.md
@@ -28,11 +28,45 @@ where <flink dir> is a Flink distribution directory.
You can also run tests individually via
```
-$ FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh
+$ FLINK_DIR=<flink dir> flink-end-to-end-tests/run-single-test.sh your_test.sh arg1 arg2
```
## Writing Tests
-Have a look at test_batch_wordcount.sh for a very basic test and
-test_streaming_kafka010.sh for a more involved example. Whenever possible, try
-to put new functionality in common.sh so that it can be reused by other tests.
+### Examples
+Have a look at `test_batch_wordcount.sh` for a very basic test and
+`test_streaming_kafka010.sh` for a more involved example. Whenever possible, try
+to put new functionality in `common.sh` so that it can be reused by other tests.
+
+### Adding a test case
+In order to add a new test case you need add it to either `test-scripts/run-nightly-tests.sh` and / or `test-scripts/run-pre-commit-tests.sh`. Templates on how to add tests can be found in those respective files.
+
+_Note: If you want to parameterize your tests please do so by adding multiple test cases with parameters as arguments to the nightly / pre-commit test suites. This allows the test runner to do a cleanup in between each individual test and also to fail those tests individually._
+
+### Passing your test
+A test is considered to have passed if it:
+- has exit code 0
+- there are no non-empty .out files (nothing was written to stdout / stderr by your Flink program)
+- there are no exceptions in the log files
+- there are no errors in the log files
+
+_Note: There is a whitelist for exceptions and errors that do not lead to failure, which can be found in the `check_logs_for_errors` and `check_logs_for_exceptions` in `test-scripts/common.sh`._
+
+Please note that a previously supported pattern where you could assign a value the global variable `PASS` to have your tests fail **is not supported anymore**.
+
+### Cleanup
+The test runner performs a cleanup after each test case, which includes:
+- Stopping the cluster
+- Killing all task and job managers
+- Reverting config to default (if changed before)
+- Cleaning up log and temp directories
+
+In some cases your test is required to do to some *additional* cleanup, for example shutting down external systems like Kafka or Elasticsearch. In this case it is a common pattern to trap a `test_cleanup` function to `EXIT` like this:
+
+```sh
+function test_cleanup {
+ # do your custom cleanup here
+}
+
+trap test_cleanup EXIT
+```
[2/7] flink git commit: [FLINK-9638][E2E Tests] Add helper script to
run single test
Posted by tz...@apache.org.
[FLINK-9638][E2E Tests] Add helper script to run single test
This commit adds a helper script `run-single-test.sh` that allows you to run
a single test in the context of the text runner.
This provides you with
* Setup of ENV variables
* cleanup after test
* Nicer output
Usage: ./run-single-test.sh <path-to-test-script> [<arg1> <arg2> ...]
This closes #6197.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d80f263f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d80f263f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d80f263f
Branch: refs/heads/master
Commit: d80f263f2e36acccf4dcae06f0cfe1717d09f58d
Parents: cc9ccbb
Author: Florian Schmidt <fl...@icloud.com>
Authored: Thu Jun 21 15:42:08 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:57 2018 +0200
----------------------------------------------------------------------
flink-end-to-end-tests/run-single-test.sh | 52 ++++++++++++++++++++++++++
1 file changed, 52 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d80f263f/flink-end-to-end-tests/run-single-test.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-single-test.sh b/flink-end-to-end-tests/run-single-test.sh
new file mode 100755
index 0000000..86b313d
--- /dev/null
+++ b/flink-end-to-end-tests/run-single-test.sh
@@ -0,0 +1,52 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This can be used to run a single test in the context of a test runnner
+# Usage: ./run-single-test.sh test-scripts/my-test-case.sh
+
+if [ $# -eq 0 ]; then
+ echo "Usage: ./run-single-test.sh <path-to-test-script> [<arg1> <arg2> ...]"
+ exit 1
+fi
+
+END_TO_END_DIR="`dirname \"$0\"`" # relative
+END_TO_END_DIR="`( cd \"$END_TO_END_DIR\" && pwd )`" # absolutized and normalized
+if [ -z "$END_TO_END_DIR" ] ; then
+ # error; for some reason, the path is not accessible
+ # to the script (e.g. permissions re-evaled after suid)
+ exit 1 # fail
+fi
+
+export END_TO_END_DIR
+
+if [ -z "$FLINK_DIR" ] ; then
+ echo "You have to export the Flink distribution directory as FLINK_DIR"
+ exit 1
+fi
+
+source "${END_TO_END_DIR}/test-scripts/test-runner-common.sh"
+
+FLINK_DIR="`( cd \"$FLINK_DIR\" && pwd )`" # absolutized and normalized
+
+echo "flink-end-to-end-test directory: $END_TO_END_DIR"
+echo "Flink distribution directory: $FLINK_DIR"
+
+run_test "$*" "$*"
+
+exit 0
[6/7] flink git commit: [FLINK-9394] [e2e] Test rescaling when
resuming from externalized checkpoints
Posted by tz...@apache.org.
[FLINK-9394] [e2e] Test rescaling when resuming from externalized checkpoints
This closes #6038.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc9ccbbc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc9ccbbc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc9ccbbc
Branch: refs/heads/master
Commit: cc9ccbbc265774947a08e8a07a63449172a79ab5
Parents: cacde0f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 18 00:33:12 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:57 2018 +0200
----------------------------------------------------------------------
flink-end-to-end-tests/run-nightly-tests.sh | 24 ++++++---
.../test_resume_externalized_checkpoints.sh | 56 +++++++++++++++-----
2 files changed, 60 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cc9ccbbc/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index c4c5069..302e27f 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -61,13 +61,23 @@ run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end test" "$E
run_test "Resuming Savepoint (rocks, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks"
run_test "Resuming Savepoint (rocks, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks"
-run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true"
-run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false"
-run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks"
-
-run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true true"
-run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false true"
-run_test "Resuming Externalized Checkpoint after terminal failure (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks true"
+run_test "Resuming Externalized Checkpoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file false true"
+run_test "Resuming Externalized Checkpoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 file false true"
+run_test "Resuming Externalized Checkpoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file true true"
+run_test "Resuming Externalized Checkpoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 file false true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 4 rocks true true"
+run_test "Resuming Externalized Checkpoint (rocks, non-incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true false"
+run_test "Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 4 2 rocks true true"
+
+run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 rocks true true true"
run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh"
http://git-wip-us.apache.org/repos/asf/flink/blob/cc9ccbbc/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index 6472b23..29c3786 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -17,13 +17,28 @@
# limitations under the License.
################################################################################
+if [ -z $1 ] || [ -z $2 ]; then
+ echo "Usage: ./test_resume_externalized_checkpoints.sh <original_dop> <new_dop> <state_backend_setting> <state_backend_file_async_setting> <state_backend_rocks_incremental_setting>"
+ exit 1
+fi
+
source "$(dirname "$0")"/common.sh
-STATE_BACKEND_TYPE=${1:-file}
-STATE_BACKEND_FILE_ASYNC=${2:-true}
-SIMULATE_FAILURE=${3:-false}
+ORIGINAL_DOP=$1
+NEW_DOP=$2
+STATE_BACKEND_TYPE=${3:-file}
+STATE_BACKEND_FILE_ASYNC=${4:-true}
+STATE_BACKEND_ROCKS_INCREMENTAL=${5:-false}
+SIMULATE_FAILURE=${6:-false}
+
+if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+ NUM_SLOTS=$ORIGINAL_DOP
+else
+ NUM_SLOTS=$NEW_DOP
+fi
backup_config
+change_conf "taskmanager.numberOfTaskSlots" "1" "${NUM_SLOTS}"
setup_flink_slf4j_metric_reporter
start_cluster
@@ -43,18 +58,30 @@ CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"
# run the DataStream allroundjob
-echo "Running externalized checkpoints test, with STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC SIMULATE_FAILURE=$SIMULATE_FAILURE ..."
+echo "Running externalized checkpoints test, \
+with ORIGINAL_DOP=$ORIGINAL_DOP NEW_DOP=$NEW_DOP \
+and STATE_BACKEND_TYPE=$STATE_BACKEND_TYPE STATE_BACKEND_FILE_ASYNC=$STATE_BACKEND_FILE_ASYNC \
+STATE_BACKEND_ROCKSDB_INCREMENTAL=$STATE_BACKEND_ROCKS_INCREMENTAL SIMULATE_FAILURE=$SIMULATE_FAILURE ..."
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-BASE_JOB_CMD="$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
- --test.semantics exactly-once \
- --environment.externalize_checkpoint true \
- --environment.externalize_checkpoint.cleanup retain \
- --state_backend $STATE_BACKEND_TYPE \
- --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
- --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
- --sequence_generator_source.sleep_time 15 \
- --sequence_generator_source.sleep_after_elements 1"
+
+function buildBaseJobCmd {
+ local dop=$1
+
+ echo "$FLINK_DIR/bin/flink run -d -p $dop $TEST_PROGRAM_JAR \
+ --test.semantics exactly-once \
+ --environment.parallelism $dop \
+ --environment.externalize_checkpoint true \
+ --environment.externalize_checkpoint.cleanup retain \
+ --state_backend $STATE_BACKEND_TYPE \
+ --state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
+ --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+ --state_backend.rocks.incremental $STATE_BACKEND_ROCKS_INCREMENTAL \
+ --sequence_generator_source.sleep_time 15 \
+ --sequence_generator_source.sleep_after_elements 1"
+}
+
+BASE_JOB_CMD=`buildBaseJobCmd $ORIGINAL_DOP`
JOB_CMD=""
if [[ $SIMULATE_FAILURE == "true" ]]; then
@@ -98,6 +125,9 @@ if (( $NUM_CHECKPOINTS > 1 )); then
fi
echo "Restoring job with externalized checkpoint at $CHECKPOINT_PATH ..."
+
+BASE_JOB_CMD=`buildBaseJobCmd $NEW_DOP`
+
DATASTREAM_JOB=$($BASE_JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')
wait_job_running $DATASTREAM_JOB
[3/7] flink git commit: [FLINK-8795] Fixed local scala shell for Flip6
Posted by tz...@apache.org.
[FLINK-8795] Fixed local scala shell for Flip6
This closes #6182.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1556aa93
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1556aa93
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1556aa93
Branch: refs/heads/master
Commit: 1556aa9359f11da00892420bad737f0597d08154
Parents: ef9e837
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Tue Jun 19 09:49:31 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:57 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/api/scala/FlinkShell.scala | 36 +++++++---
.../flink/api/scala/ScalaShellITCase.scala | 72 +++++++++++++-------
.../scala/ScalaShellLocalStartupITCase.scala | 14 +++-
3 files changed, 87 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1556aa93/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index dbdc052..b74a8a0 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -23,9 +23,9 @@ import java.io._
import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser}
import org.apache.flink.client.deployment.ClusterDescriptor
import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.configuration.{Configuration, GlobalConfiguration, JobManagerOptions}
+import org.apache.flink.configuration.{Configuration, CoreOptions, GlobalConfiguration, JobManagerOptions}
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
+import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
import scala.collection.mutable.ArrayBuffer
import scala.tools.nsc.Settings
@@ -137,20 +137,36 @@ object FlinkShell {
}
}
+ private type LocalCluster = Either[StandaloneMiniCluster, MiniCluster]
+
def fetchConnectionInfo(
configuration: Configuration,
config: Config
- ): (String, Int, Option[Either[StandaloneMiniCluster, ClusterClient[_]]]) = {
+ ): (String, Int, Option[Either[LocalCluster , ClusterClient[_]]]) = {
config.executionMode match {
case ExecutionMode.LOCAL => // Local mode
val config = configuration
config.setInteger(JobManagerOptions.PORT, 0)
- val miniCluster = new StandaloneMiniCluster(config)
+ val (miniCluster, port) = config.getString(CoreOptions.MODE) match {
+ case CoreOptions.LEGACY_MODE => {
+ val cluster = new StandaloneMiniCluster(config)
+
+ (Left(cluster), cluster.getPort)
+ }
+ case CoreOptions.NEW_MODE => {
+ val miniClusterConfig = new MiniClusterConfiguration.Builder()
+ .setConfiguration(config)
+ .build()
+ val cluster = new MiniCluster(miniClusterConfig)
+ cluster.start()
+
+ (Right(cluster), cluster.getRestAddress.getPort)
+ }
+ }
- println("\nStarting local Flink cluster (host: localhost, " +
- s"port: ${miniCluster.getPort}).\n")
- ("localhost", miniCluster.getPort, Some(Left(miniCluster)))
+ println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
+ ("localhost", port, Some(Left(miniCluster)))
case ExecutionMode.REMOTE => // Remote mode
if (config.host.isEmpty || config.port.isEmpty) {
@@ -193,7 +209,8 @@ object FlinkShell {
val (repl, cluster) = try {
val (host, port, cluster) = fetchConnectionInfo(configuration, config)
val conf = cluster match {
- case Some(Left(miniCluster)) => miniCluster.getConfiguration
+ case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
+ case Some(Left(Right(_))) => configuration
case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
case None => configuration
}
@@ -223,7 +240,8 @@ object FlinkShell {
} finally {
repl.closeInterpreter()
cluster match {
- case Some(Left(miniCluster)) => miniCluster.close()
+ case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
+ case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
case Some(Right(yarnCluster)) => yarnCluster.shutdown()
case _ =>
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1556aa93/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 2e07fb9..12522a8 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -19,19 +19,16 @@
package org.apache.flink.api.scala
import java.io._
+import java.util.Objects
-import akka.actor.ActorRef
-import akka.pattern.Patterns
-import org.apache.flink.configuration.{Configuration, CoreOptions, TaskManagerOptions}
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
+import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions}
import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
+import org.apache.flink.test.util.MiniClusterResource
import org.apache.flink.util.TestLogger
-import org.junit.rules.TemporaryFolder
import org.junit._
+import org.junit.rules.TemporaryFolder
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration.FiniteDuration
import scala.tools.nsc.Settings
class ScalaShellITCase extends TestLogger {
@@ -280,11 +277,11 @@ class ScalaShellITCase extends TestLogger {
BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml"))
val args = cluster match {
- case Some(cl) =>
+ case Some(_) =>
Array(
"remote",
- cl.getHostname,
- Integer.toString(cl.getPort),
+ hostname,
+ Integer.toString(port),
"--configDir",
dir.getAbsolutePath)
case None => throw new IllegalStateException("Cluster has not been started.")
@@ -314,17 +311,43 @@ class ScalaShellITCase extends TestLogger {
}
object ScalaShellITCase {
- var cluster: Option[StandaloneMiniCluster] = None
- val parallelism = 4
val configuration = new Configuration()
+ var cluster: Option[Either[MiniCluster, StandaloneMiniCluster]] = None
+
+ var port: Int = _
+ var hostname : String = _
+ val parallelism: Int = 4
@BeforeClass
def beforeAll(): Unit = {
- configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
- configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
-
- cluster = Option(new StandaloneMiniCluster(configuration))
+ val isNew = Objects.equals(MiniClusterResource.NEW_CODEBASE,
+ System.getProperty(MiniClusterResource.CODEBASE_KEY))
+ if (isNew) {
+ configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+ // set to different than default so not to interfere with ScalaShellLocalStartupITCase
+ configuration.setInteger(RestOptions.PORT, 8082)
+ val miniConfig = new MiniClusterConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumSlotsPerTaskManager(parallelism)
+ .build()
+
+ val miniCluster = new MiniCluster(miniConfig)
+ miniCluster.start()
+ port = miniCluster.getRestAddress.getPort
+ hostname = miniCluster.getRestAddress.getHost
+
+ cluster = Some(Left(miniCluster))
+ } else {
+ configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
+ val standaloneCluster = new StandaloneMiniCluster(configuration)
+
+ hostname = standaloneCluster.getHostname
+ port = standaloneCluster.getPort
+
+ cluster = Some(Right(standaloneCluster))
+ }
}
@AfterClass
@@ -332,7 +355,10 @@ object ScalaShellITCase {
// The Scala interpreter somehow changes the class loader. Therefore, we have to reset it
Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
- cluster.foreach(_.close)
+ cluster.foreach {
+ case Left(miniCluster) => miniCluster.close()
+ case Right(miniCluster) => miniCluster.close()
+ }
}
/**
@@ -350,18 +376,18 @@ object ScalaShellITCase {
System.setOut(new PrintStream(baos))
cluster match {
- case Some(cl) =>
+ case Some(_) =>
val repl = externalJars match {
case Some(ej) => new FlinkILoop(
- cl.getHostname,
- cl.getPort,
+ hostname,
+ port,
configuration,
Option(Array(ej)),
in, new PrintWriter(out))
case None => new FlinkILoop(
- cl.getHostname,
- cl.getPort,
+ hostname,
+ port,
configuration,
in, new PrintWriter(out))
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1556aa93/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index 9365948..a971db8 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -19,12 +19,14 @@
package org.apache.flink.api.scala
import java.io._
+import java.util.Objects
import org.apache.flink.configuration.{Configuration, CoreOptions}
import org.apache.flink.runtime.clusterframework.BootstrapTools
+import org.apache.flink.test.util.MiniClusterResource
import org.apache.flink.util.TestLogger
-import org.junit.{Assert, Rule, Test}
import org.junit.rules.TemporaryFolder
+import org.junit.{Assert, Rule, Test}
class ScalaShellLocalStartupITCase extends TestLogger {
@@ -85,7 +87,13 @@ class ScalaShellLocalStartupITCase extends TestLogger {
System.setOut(new PrintStream(baos))
val configuration = new Configuration()
- configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
+ val mode = if (Objects.equals(MiniClusterResource.NEW_CODEBASE,
+ System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+ CoreOptions.NEW_MODE
+ } else {
+ CoreOptions.LEGACY_MODE
+ }
+ configuration.setString(CoreOptions.MODE, mode)
val dir = temporaryFolder.newFolder()
BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml"))
@@ -93,7 +101,7 @@ class ScalaShellLocalStartupITCase extends TestLogger {
val args: Array[String] = Array("local", "--configDir", dir.getAbsolutePath)
//start flink scala shell
- FlinkShell.bufferedReader = Some(in);
+ FlinkShell.bufferedReader = Some(in)
FlinkShell.main(args)
baos.flush()
[5/7] flink git commit: [hotfix] [docs] Fix a typo in index.md
Posted by tz...@apache.org.
[hotfix] [docs] Fix a typo in index.md
This closes #6193.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cacde0f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cacde0f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cacde0f3
Branch: refs/heads/master
Commit: cacde0f3f567302d933ebf09c929d73e0c5c9d56
Parents: 1556aa9
Author: MichealShin <lu...@126.com>
Authored: Thu Jun 21 16:50:34 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:57 2018 +0200
----------------------------------------------------------------------
docs/index.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cacde0f3/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 78ee7d0..419bb39 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -31,7 +31,7 @@ Apache Flink is an open source platform for distributed stream and batch data pr
## First Steps
-- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommended you read these sections first.
+- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommend you read these sections first.
- **Quickstarts**: [Run an example program](quickstart/setup_quickstart.html) on your local machine or [study some examples](examples/index.html).
[7/7] flink git commit: [FLINK-9374] [kinesis] Enable
FlinkKinesisProducer backpressuring
Posted by tz...@apache.org.
[FLINK-9374] [kinesis] Enable FlinkKinesisProducer backpressuring
This closes #9374.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d034d4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d034d4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d034d4e
Branch: refs/heads/master
Commit: 7d034d4ef6986ba5ccda6f5e8c587b8fdd88be8e
Parents: f1f79ee
Author: Franz Thoma <fr...@tngtech.com>
Authored: Wed May 9 08:27:47 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:58 2018 +0200
----------------------------------------------------------------------
docs/dev/connectors/kinesis.md | 30 +++++++
.../kinesis/FlinkKinesisProducer.java | 70 ++++++++++++++-
.../connectors/kinesis/util/TimeoutLatch.java | 44 +++++++++
.../kinesis/FlinkKinesisProducerTest.java | 95 +++++++++++++++-----
.../flink/core/testutils/CheckedThread.java | 13 ++-
.../flink/core/testutils/MultiShotLatch.java | 9 ++
6 files changed, 239 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 7551142..03224a1 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -344,6 +344,36 @@ Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL fr
Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example.
+### Backpressure
+
+By default, `FlinkKinesisProducer` does not backpressure. Instead, records that
+cannot be sent because of the rate restriction of 1 MB per second per shard are
+buffered in an unbounded queue and dropped when their `RecordTtl` expires.
+
+To avoid data loss, you can enable backpressuring by restricting the size of the
+internal queue:
+
+```
+// 200 Bytes per record, 1 shard
+kinesis.setQueueLimit(500);
+```
+
+The value for `queueLimit` depends on the expected record size. To choose a good
+value, consider that Kinesis is rate-limited to 1MB per second per shard. If
+less than one second's worth of records is buffered, then the queue may not be
+able to operate at full capacity. With the default `RecordMaxBufferedTime` of
+100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit`
+can then be computed via
+
+```
+queue limit = (number of shards * queue size per shard) / record size
+```
+
+E.g. for 200Bytes per record and 8 shards, a queue limit of 4000 is a good
+starting point. If the queue size limits throughput (below 1MB per second per
+shard), try increasing the queue limit slightly.
+
+
## Using Non-AWS Kinesis Endpoints for Testing
It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as
http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index a9b48ae..b086ac1 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -21,12 +21,15 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.TimeoutLatch;
import org.apache.flink.util.InstantiationUtil;
import com.amazonaws.services.kinesis.producer.Attempt;
@@ -55,6 +58,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@PublicEvolving
public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
+ public static final String KINESIS_PRODUCER_METRIC_GROUP = "kinesisProducer";
+
+ public static final String METRIC_BACKPRESSURE_CYCLES = "backpressureCycles";
+
+ public static final String METRIC_OUTSTANDING_RECORDS_COUNT = "outstandingRecordsCount";
+
private static final long serialVersionUID = 6447077318449477846L;
private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
@@ -65,6 +74,9 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
/* Flag controlling the error behavior of the producer */
private boolean failOnError = false;
+ /* Maximum length of the internal record queue before backpressuring */
+ private int queueLimit = Integer.MAX_VALUE;
+
/* Name of the default stream to produce to. Can be overwritten by the serialization schema */
private String defaultStream;
@@ -82,9 +94,15 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
/* Our Kinesis instance for each parallel Flink sink */
private transient KinesisProducer producer;
+ /* Backpressuring waits for this latch, triggered by record callback */
+ private transient TimeoutLatch backpressureLatch;
+
/* Callback handling failures */
private transient FutureCallback<UserRecordResult> callback;
+ /* Counts how often we have to wait for KPL because we are above the queue limit */
+ private transient Counter backpressureCycles;
+
/* Field for async exception */
private transient volatile Throwable thrownException;
@@ -145,6 +163,18 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
}
/**
+ * The {@link KinesisProducer} holds an unbounded queue internally. To avoid memory
+ * problems under high loads, a limit can be employed above which the internal queue
+ * will be flushed, thereby applying backpressure.
+ *
+ * @param queueLimit The maximum length of the internal queue before backpressuring
+ */
+ public void setQueueLimit(int queueLimit) {
+ checkArgument(queueLimit > 0, "queueLimit must be a positive number");
+ this.queueLimit = queueLimit;
+ }
+
+ /**
* Set a default stream name.
* @param defaultStream Name of the default Kinesis stream
*/
@@ -180,9 +210,16 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
producer = getKinesisProducer(producerConfig);
+
+ final MetricGroup kinesisMectricGroup = getRuntimeContext().getMetricGroup().addGroup(KINESIS_PRODUCER_METRIC_GROUP);
+ this.backpressureCycles = kinesisMectricGroup.counter(METRIC_BACKPRESSURE_CYCLES);
+ kinesisMectricGroup.gauge(METRIC_OUTSTANDING_RECORDS_COUNT, producer::getOutstandingRecordsCount);
+
+ backpressureLatch = new TimeoutLatch();
callback = new FutureCallback<UserRecordResult>() {
@Override
public void onSuccess(UserRecordResult result) {
+ backpressureLatch.trigger();
if (!result.isSuccessful()) {
if (failOnError) {
// only remember the first thrown exception
@@ -197,6 +234,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
@Override
public void onFailure(Throwable t) {
+ backpressureLatch.trigger();
if (failOnError) {
thrownException = t;
} else {
@@ -219,6 +257,11 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
}
checkAndPropagateAsyncError();
+ boolean didWaitForFlush = enforceQueueLimit();
+
+ if (didWaitForFlush) {
+ checkAndPropagateAsyncError();
+ }
String stream = defaultStream;
String partition = defaultPartition;
@@ -327,6 +370,32 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
}
/**
+ * If the internal queue of the {@link KinesisProducer} gets too long,
+ * flush some of the records until we are below the limit again.
+ * We don't want to flush _all_ records at this point since that would
+ * break record aggregation.
+ *
+ * @return boolean whether flushing occurred or not
+ */
+ private boolean enforceQueueLimit() {
+ int attempt = 0;
+ while (producer.getOutstandingRecordsCount() >= queueLimit) {
+ backpressureCycles.inc();
+ if (attempt >= 10) {
+ LOG.warn("Waiting for the queue length to drop below the limit takes unusually long, still not done after {} attempts.", attempt);
+ }
+ attempt++;
+ try {
+ backpressureLatch.await(100);
+ } catch (InterruptedException e) {
+ LOG.warn("Flushing was interrupted.");
+ break;
+ }
+ }
+ return attempt > 0;
+ }
+
+ /**
* A reimplementation of {@link KinesisProducer#flushSync()}.
* This implementation releases the block on flushing if an interruption occurred.
*/
@@ -337,7 +406,6 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> implements
Thread.sleep(500);
} catch (InterruptedException e) {
LOG.warn("Flushing was interrupted.");
-
break;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
new file mode 100644
index 0000000..4dcab33
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.Internal;
+
+@Internal
+public class TimeoutLatch {
+
+ private final Object lock = new Object();
+ private volatile boolean waiting;
+
+ public void await(long timeout) throws InterruptedException {
+ synchronized (lock) {
+ waiting = true;
+ lock.wait(timeout);
+ }
+ }
+
+ public void trigger() {
+ if (waiting) {
+ synchronized (lock) {
+ waiting = false;
+ lock.notifyAll();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index 86cefff..6355cf5 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -46,6 +46,7 @@ import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@@ -267,6 +268,79 @@ public class FlinkKinesisProducerTest {
testHarness.close();
}
+ /**
+ * Test ensuring that the producer blocks if the queue limit is exceeded,
+ * until the queue length drops below the limit;
+ * we set a timeout because the test will not finish if the logic is broken.
+ */
+ @Test(timeout = 10000)
+ public void testBackpressure() throws Throwable {
+ final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+ producer.setQueueLimit(1);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
+
+ UserRecordResult result = mock(UserRecordResult.class);
+ when(result.isSuccessful()).thenReturn(true);
+
+ CheckedThread msg1 = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ }
+ };
+ msg1.start();
+ msg1.trySync(100);
+ assertFalse("Flush triggered before reaching queue limit", msg1.isAlive());
+
+ // consume msg-1 so that queue is empty again
+ producer.getPendingRecordFutures().get(0).set(result);
+
+ CheckedThread msg2 = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ }
+ };
+ msg2.start();
+ msg2.trySync(100);
+ assertFalse("Flush triggered before reaching queue limit", msg2.isAlive());
+
+ CheckedThread moreElementsThread = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ // this should block until msg-2 is consumed
+ testHarness.processElement(new StreamRecord<>("msg-3"));
+ // this should block until msg-3 is consumed
+ testHarness.processElement(new StreamRecord<>("msg-4"));
+ }
+ };
+ moreElementsThread.start();
+
+ moreElementsThread.trySync(100);
+ assertTrue("Producer should still block, but doesn't", moreElementsThread.isAlive());
+
+ // consume msg-2 from the queue, leaving msg-3 in the queue and msg-4 blocked
+ producer.getPendingRecordFutures().get(1).set(result);
+
+ moreElementsThread.trySync(100);
+ assertTrue("Producer should still block, but doesn't", moreElementsThread.isAlive());
+
+ // consume msg-3, blocked msg-4 can be inserted into the queue and block is released
+ producer.getPendingRecordFutures().get(2).set(result);
+
+ moreElementsThread.trySync(100);
+
+ assertFalse("Prodcuer still blocks although the queue is flushed", moreElementsThread.isAlive());
+
+ producer.getPendingRecordFutures().get(3).set(result);
+
+ testHarness.close();
+ }
+
// ----------------------------------------------------------------------
// Utility test classes
// ----------------------------------------------------------------------
@@ -346,7 +420,6 @@ public class FlinkKinesisProducerTest {
private List<SettableFuture<UserRecordResult>> pendingRecordFutures = new LinkedList<>();
private transient MultiShotLatch flushLatch;
- private boolean isFlushed;
DummyFlinkKinesisProducer(SerializationSchema<T> schema) {
super(schema, TestUtils.getStandardProperties());
@@ -378,13 +451,6 @@ public class FlinkKinesisProducerTest {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
flushLatch.trigger();
-
- while (!isAllRecordFuturesCompleted()) {
- Thread.sleep(50);
- }
-
- isFlushed = true;
-
return null;
}
}).when(mockProducer).flush();
@@ -399,12 +465,11 @@ public class FlinkKinesisProducerTest {
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
- isFlushed = false;
super.snapshotState(context);
// if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
- if (!isFlushed) {
+ if (mockProducer.getOutstandingRecordsCount() > 0) {
throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
}
}
@@ -417,16 +482,6 @@ public class FlinkKinesisProducerTest {
flushLatch.await();
}
- private boolean isAllRecordFuturesCompleted() {
- for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
- if (!future.isDone()) {
- return false;
- }
- }
-
- return true;
- }
-
private int getNumPendingRecordFutures() {
int numPending = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
index f2647cc..7aa177a 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -97,8 +97,19 @@ public abstract class CheckedThread extends Thread {
* exceptions thrown from the {@link #go()} method.
*/
public void sync(long timeout) throws Exception {
- join(timeout);
+ trySync(timeout);
checkFinished();
+ }
+
+ /**
+ * Waits with timeout until the thread is completed and checks whether any error
+ * occurred during the execution.
+ *
+ * <p>This method blocks like {@link #join()}, but performs an additional check for
+ * exceptions thrown from the {@link #go()} method.
+ */
+ public void trySync(long timeout) throws Exception {
+ join(timeout);
checkError();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7d034d4e/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
index 69f73eb..861ea31 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/MultiShotLatch.java
@@ -52,4 +52,13 @@ public final class MultiShotLatch {
triggered = false;
}
}
+
+ /**
+ * Checks if the latch was triggered.
+ *
+ * @return True, if the latch was triggered, false if not.
+ */
+ public boolean isTriggered() {
+ return triggered;
+ }
}
[4/7] flink git commit: [FLINK-8944] [Kinesis Connector] Use
listShards instead of DescribeStream for shard discovery as it offer higher
rate limits
Posted by tz...@apache.org.
[FLINK-8944] [Kinesis Connector] Use listShards instead of DescribeStream for shard discovery as it offer higher rate limits
This closes #5992
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef9e8378
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef9e8378
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef9e8378
Branch: refs/heads/master
Commit: ef9e8378929b2f068c71262ed93702da0b023b04
Parents: ae8cef3
Author: Kailash HD <kd...@lyft.com>
Authored: Mon Mar 26 09:42:25 2018 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jun 22 09:30:57 2018 +0200
----------------------------------------------------------------------
.../kinesis/config/ConsumerConfigConstants.java | 36 +++++
.../connectors/kinesis/proxy/KinesisProxy.java | 134 +++++++++--------
.../kinesis/util/KinesisConfigUtil.java | 35 ++++-
.../kinesis/proxy/KinesisProxyTest.java | 143 +++++++++++++++++++
.../kinesis/util/KinesisConfigUtilTest.java | 18 +--
5 files changed, 295 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ef9e8378/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 38a0e3d..e46f79e 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -65,15 +65,42 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
+ /**
+ * Deprecated key.
+ *
+ * @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_BASE} instead
+ **/
+ @Deprecated
/** The base backoff time between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
+ /**
+ * Deprecated key.
+ *
+ * @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_MAX} instead
+ **/
+ @Deprecated
/** The maximum backoff time between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";
+ /**
+ * Deprecated key.
+ *
+ * @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT} instead
+ **/
+ @Deprecated
/** The power constant for exponential backoff between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";
+ /** The base backoff time between each listShards attempt. */
+ public static final String LIST_SHARDS_BACKOFF_BASE = "flink.list.shards.backoff.base";
+
+ /** The maximum backoff time between each listShards attempt. */
+ public static final String LIST_SHARDS_BACKOFF_MAX = "flink.list.shards.backoff.max";
+
+ /** The power constant for exponential backoff between each listShards attempt. */
+ public static final String LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.list.shards.backoff.expconst";
+
/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
@@ -115,12 +142,21 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
+ @Deprecated
public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
+ @Deprecated
public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
+ @Deprecated
public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+ public static final long DEFAULT_LIST_SHARDS_BACKOFF_BASE = 1000L;
+
+ public static final long DEFAULT_LIST_SHARDS_BACKOFF_MAX = 5000L;
+
+ public static final double DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000;
public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
http://git-wip-us.apache.org/repos/asf/flink/blob/ef9e8378/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 09e9d4c..7e6a360 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -21,24 +21,27 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.AmazonKinesis;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ExpiredNextTokenException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.ResourceInUseException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import com.amazonaws.services.kinesis.model.StreamStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,17 +79,17 @@ public class KinesisProxy implements KinesisProxyInterface {
private static final Random seed = new Random();
// ------------------------------------------------------------------------
- // describeStream() related performance settings
+ // listShards() related performance settings
// ------------------------------------------------------------------------
- /** Base backoff millis for the describe stream operation. */
- private final long describeStreamBaseBackoffMillis;
+ /** Base backoff millis for the list shards operation. */
+ private final long listShardsBaseBackoffMillis;
- /** Maximum backoff millis for the describe stream operation. */
- private final long describeStreamMaxBackoffMillis;
+ /** Maximum backoff millis for the list shards operation. */
+ private final long listShardsMaxBackoffMillis;
- /** Exponential backoff power constant for the describe stream operation. */
- private final double describeStreamExpConstant;
+ /** Exponential backoff power constant for the list shards operation. */
+ private final double listShardsExpConstant;
// ------------------------------------------------------------------------
// getRecords() related performance settings
@@ -127,21 +130,22 @@ public class KinesisProxy implements KinesisProxyInterface {
*/
protected KinesisProxy(Properties configProps) {
checkNotNull(configProps);
+ KinesisConfigUtil.replaceDeprecatedConsumerKeys(configProps);
this.kinesisClient = createKinesisClient(configProps);
- this.describeStreamBaseBackoffMillis = Long.valueOf(
+ this.listShardsBaseBackoffMillis = Long.valueOf(
configProps.getProperty(
- ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
- Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
- this.describeStreamMaxBackoffMillis = Long.valueOf(
+ ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
+ Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_BASE)));
+ this.listShardsMaxBackoffMillis = Long.valueOf(
configProps.getProperty(
- ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
- Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
- this.describeStreamExpConstant = Double.valueOf(
+ ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
+ Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_MAX)));
+ this.listShardsExpConstant = Double.valueOf(
configProps.getProperty(
- ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
- Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
+ ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
+ Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
this.getRecordsBaseBackoffMillis = Long.valueOf(
configProps.getProperty(
@@ -353,19 +357,24 @@ public class KinesisProxy implements KinesisProxyInterface {
private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
List<StreamShardHandle> shardsOfStream = new ArrayList<>();
- DescribeStreamResult describeStreamResult;
+ // List Shards returns just the first 1000 shard entries. In order to read the entire stream,
+ // we need to use the returned nextToken to get additional shards.
+ ListShardsResult listShardsResult;
+ String startShardToken = null;
do {
- describeStreamResult = describeStream(streamName, lastSeenShardId);
-
- List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
+ listShardsResult = listShards(streamName, lastSeenShardId, startShardToken);
+ if (listShardsResult == null) {
+ // In case we have exceptions while retrieving all shards, ensure that incomplete shard list is not returned.
+ // Hence clearing the incomplete shard list before returning it.
+ shardsOfStream.clear();
+ return shardsOfStream;
+ }
+ List<Shard> shards = listShardsResult.getShards();
for (Shard shard : shards) {
shardsOfStream.add(new StreamShardHandle(streamName, shard));
}
-
- if (shards.size() != 0) {
- lastSeenShardId = shards.get(shards.size() - 1).getShardId();
- }
- } while (describeStreamResult.getStreamDescription().isHasMoreShards());
+ startShardToken = listShardsResult.getNextToken();
+ } while (startShardToken != null);
return shardsOfStream;
}
@@ -382,41 +391,55 @@ public class KinesisProxy implements KinesisProxyInterface {
* @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
* @return the result of the describe stream operation
*/
- private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
- final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
- describeStreamRequest.setStreamName(streamName);
- describeStreamRequest.setExclusiveStartShardId(startShardId);
+ private ListShardsResult listShards(String streamName, @Nullable String startShardId,
+ @Nullable String startNextToken)
+ throws InterruptedException {
+ final ListShardsRequest listShardsRequest = new ListShardsRequest();
+ if (startNextToken == null) {
+ listShardsRequest.setExclusiveStartShardId(startShardId);
+ listShardsRequest.setStreamName(streamName);
+ } else {
+ // Note the nextToken returned by AWS expires within 300 sec.
+ listShardsRequest.setNextToken(startNextToken);
+ }
- DescribeStreamResult describeStreamResult = null;
+ ListShardsResult listShardsResults = null;
- // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
+ // Call ListShards, with full-jitter backoff (if we get LimitExceededException).
int attemptCount = 0;
- while (describeStreamResult == null) { // retry until we get a result
+ // List Shards returns just the first 1000 shard entries. Make sure that all entries
+ // are taken up.
+ while (listShardsResults == null) { // retry until we get a result
try {
- describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
+
+ listShardsResults = kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
- describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
- LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
- + backoffMillis + " millis.");
+ listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+ LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
+ + ". Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
- } catch (ResourceNotFoundException re) {
- throw new RuntimeException("Error while getting stream details", re);
- }
- }
-
- String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
- if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
- "describeStream operation will not contain any shard information.");
+ } catch (ResourceInUseException reInUse) {
+ if (LOG.isWarnEnabled()) {
+ // List Shards will throw an exception if stream in not in active state. Return and re-use previous state available.
+ LOG.info("The stream is currently not in active state. Reusing the older state "
+ + "for the time being");
+ break;
+ }
+ } catch (ResourceNotFoundException reNotFound) {
+ throw new RuntimeException("Stream not found. Error while getting shard list.", reNotFound);
+ } catch (InvalidArgumentException inArg) {
+ throw new RuntimeException("Invalid Arguments to listShards.", inArg);
+ } catch (ExpiredNextTokenException expiredToken) {
+ LOG.warn("List Shards has an expired token. Reusing the previous state.");
+ break;
}
}
-
- // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
- // start shard id in the returned shards list; check if we need to remove these erroneously returned shards
- if (startShardId != null) {
- List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
+ // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
+ // the exclusive start shard id in the returned shards list; check if we need to remove
+ // these erroneously returned shards.
+ if (startShardId != null && listShardsResults != null) {
+ List<Shard> shards = listShardsResults.getShards();
Iterator<Shard> shardItr = shards.iterator();
while (shardItr.hasNext()) {
if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
@@ -424,8 +447,7 @@ public class KinesisProxy implements KinesisProxyInterface {
}
}
}
-
- return describeStreamResult;
+ return listShardsResults;
}
protected static long fullJitterBackoff(long base, long max, double power, int attempt) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ef9e8378/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index dc8b79b..a4d60ed 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -28,9 +28,13 @@ import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConsta
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -73,6 +77,8 @@ public class KinesisConfigUtil {
/** Default values for ThreadPoolSize. **/
protected static final int DEFAULT_THREAD_POOL_SIZE = 10;
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisConfigUtil.class);
+
/**
* Validate configuration properties for {@link FlinkKinesisConsumer}.
*/
@@ -142,14 +148,14 @@ public class KinesisConfigUtil {
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
"Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value.");
- validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
- "Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value.");
+ validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
+ "Invalid value given for list shards operation base backoff milliseconds. Must be a valid non-negative long value.");
- validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
- "Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value.");
+ validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
+ "Invalid value given for list shards operation max backoff milliseconds. Must be a valid non-negative long value.");
- validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
- "Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value.");
+ validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
+ "Invalid value given for list shards operation backoff exponential constant. Must be a valid non-negative double value.");
if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
checkArgument(
@@ -181,6 +187,23 @@ public class KinesisConfigUtil {
return configProps;
}
+ public static Properties replaceDeprecatedConsumerKeys(Properties configProps) {
+ HashMap<String, String> deprecatedOldKeyToNewKeys = new HashMap<>();
+ deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE);
+ deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX);
+ deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT);
+ for (Map.Entry<String, String> entry : deprecatedOldKeyToNewKeys.entrySet()) {
+ String deprecatedOldKey = entry.getKey();
+ String newKey = entry.getValue();
+ if (configProps.containsKey(deprecatedOldKey)) {
+ LOG.warn("Please note {} property has been deprecated. Please use the {} new property key", deprecatedOldKey, newKey);
+ configProps.setProperty(newKey, configProps.getProperty(deprecatedOldKey));
+ configProps.remove(deprecatedOldKey);
+ }
+ }
+ return configProps;
+ }
+
/**
* Validate configuration properties for {@link FlinkKinesisProducer},
* and return a constructed KinesisProducerConfiguration.
http://git-wip-us.apache.org/repos/asf/flink/blob/ef9e8378/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 25f4381..775ae4b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -18,6 +18,9 @@
package org.apache.flink.streaming.connectors.kinesis.proxy;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import com.amazonaws.AmazonServiceException;
@@ -26,15 +29,34 @@ import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Assert;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
/**
* Test for methods in the {@link KinesisProxy} class.
@@ -70,6 +92,66 @@ public class KinesisProxyTest {
}
@Test
+ public void testGetShardList() throws Exception {
+ List<String> shardIds =
+ Arrays.asList(
+ "shardId-000000000000",
+ "shardId-000000000001",
+ "shardId-000000000002",
+ "shardId-000000000003");
+ String nextToken = "NextToken";
+ String fakeStreamName = "fake-stream";
+ List<Shard> shards = shardIds
+ .stream()
+ .map(shardId -> new Shard().withShardId(shardId))
+ .collect(Collectors.toList());
+ Properties kinesisConsumerConfig = new Properties();
+ kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+ kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "fake_accesskey");
+ kinesisConsumerConfig.setProperty(
+ ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "fake_secretkey");
+ KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+ AmazonKinesis mockClient = mock(AmazonKinesis.class);
+ Whitebox.setInternalState(kinesisProxy, "kinesisClient", mockClient);
+
+ ListShardsResult responseWithMoreData =
+ new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(nextToken);
+ ListShardsResult responseFinal =
+ new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null);
+ doReturn(responseWithMoreData)
+ .when(mockClient)
+ .listShards(argThat(initialListShardsRequestMatcher()));
+ doReturn(responseFinal).
+ when(mockClient).
+ listShards(argThat(listShardsNextToken(nextToken)));
+ HashMap<String, String> streamHashMap =
+ createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName));
+ GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap);
+
+ Assert.assertEquals(shardListResult.hasRetrievedShards(), true);
+
+ Set<String> expectedStreams = new HashSet<>();
+ expectedStreams.add(fakeStreamName);
+ Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams);
+ List<StreamShardHandle> actualShardList =
+ shardListResult.getRetrievedShardListOfStream(fakeStreamName);
+ List<StreamShardHandle> expectedStreamShard = new ArrayList<>();
+ assertThat(actualShardList, hasSize(4));
+ for (int i = 0; i < 4; i++) {
+ StreamShardHandle shardHandle =
+ new StreamShardHandle(
+ fakeStreamName,
+ new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)));
+ expectedStreamShard.add(shardHandle);
+ }
+
+ Assert.assertThat(
+ actualShardList,
+ containsInAnyOrder(
+ expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()])));
+ }
+
+ @Test
public void testCustomConfigurationOverride() {
Properties configProps = new Properties();
configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
@@ -101,4 +183,65 @@ public class KinesisProxyTest {
assertEquals(9999, clientConfiguration.getSocketTimeout());
}
+ protected static HashMap<String, String>
+ createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) {
+ HashMap<String, String> initial = new HashMap<>();
+ for (String stream : streams) {
+ initial.put(stream, null);
+ }
+ return initial;
+ }
+
+ private static ListShardsRequestMatcher initialListShardsRequestMatcher() {
+ return new ListShardsRequestMatcher(null, null);
+ }
+
+ private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) {
+ return new ListShardsRequestMatcher(null, nextToken);
+ }
+
+ private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher<ListShardsRequest> {
+ private final String shardId;
+ private final String nextToken;
+
+ ListShardsRequestMatcher(String shardIdArg, String nextTokenArg) {
+ shardId = shardIdArg;
+ nextToken = nextTokenArg;
+ }
+
+ @Override
+ protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) {
+ if (shardId == null) {
+ if (listShardsRequest.getExclusiveStartShardId() != null) {
+ return false;
+ }
+ } else {
+ if (!shardId.equals(listShardsRequest.getExclusiveStartShardId())) {
+ return false;
+ }
+ }
+
+ if (listShardsRequest.getNextToken() != null) {
+ if (!(listShardsRequest.getStreamName() == null
+ && listShardsRequest.getExclusiveStartShardId() == null)) {
+ return false;
+ }
+
+ if (!listShardsRequest.getNextToken().equals(nextToken)) {
+ return false;
+ }
+ } else {
+ return nextToken == null;
+ }
+ return true;
+ }
+
+ @Override
+ public void describeTo(final Description description) {
+ description
+ .appendText("A ListShardsRequest with a shardId: ")
+ .appendValue(shardId)
+ .appendText(" and empty nextToken");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef9e8378/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index 69ca58b..7d05783 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -333,34 +333,34 @@ public class KinesisConfigUtilTest {
}
@Test
- public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
+ public void testUnparsableLongForListShardsBackoffBaseMillisInConfig() {
exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
+ exception.expectMessage("Invalid value given for list shards operation base backoff milliseconds");
Properties testConfig = TestUtils.getStandardProperties();
- testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
+ testConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
- public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() {
+ public void testUnparsableLongForListShardsBackoffMaxMillisInConfig() {
exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
+ exception.expectMessage("Invalid value given for list shards operation max backoff milliseconds");
Properties testConfig = TestUtils.getStandardProperties();
- testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
+ testConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
- public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() {
+ public void testUnparsableDoubleForListShardsBackoffExponentialConstantInConfig() {
exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
+ exception.expectMessage("Invalid value given for list shards operation backoff exponential constant");
Properties testConfig = TestUtils.getStandardProperties();
- testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
+ testConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}