You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/04/18 12:10:23 UTC
[1/2] flink git commit: [FLINK-8990] [test] Test partition discovery
in Kafka end-to-end test
Repository: flink
Updated Branches:
refs/heads/master d5ec911f5 -> e77f9d28b
[FLINK-8990] [test] Test partition discovery in Kafka end-to-end test
This closes #5779.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e77f9d28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e77f9d28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e77f9d28
Branch: refs/heads/master
Commit: e77f9d28bed3c0c0274db52d1e9f10f2a0b3dc38
Parents: 9fec5ca
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Mar 28 16:36:18 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Apr 18 20:09:34 2018 +0800
----------------------------------------------------------------------
.../test-scripts/kafka-common.sh | 27 ++++++++++-
.../test-scripts/test_streaming_kafka010.sh | 47 ++++++++++++++++++--
2 files changed, 69 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e77f9d28/flink-end-to-end-tests/test-scripts/kafka-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh b/flink-end-to-end-tests/test-scripts/kafka-common.sh
index 7f05357..49ff4fe 100644
--- a/flink-end-to-end-tests/test-scripts/kafka-common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh
@@ -70,5 +70,30 @@ function send_messages_to_kafka {
}
function read_messages_from_kafka {
- $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic $2 --from-beginning --max-messages $1 2> /dev/null
+ $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning \
+ --max-messages $1 \
+ --topic $2 \
+ --consumer-property group.id=$3 2> /dev/null
+}
+
+function modify_num_partitions {
+ $KAFKA_DIR/bin/kafka-topics.sh --alter --topic $1 --partitions $2 --zookeeper localhost:2181
+}
+
+function get_num_partitions {
+ $KAFKA_DIR/bin/kafka-topics.sh --describe --topic $1 --zookeeper localhost:2181 | grep -Eo "PartitionCount:[0-9]+" | cut -d ":" -f 2
+}
+
+function get_partition_end_offset {
+ local topic=$1
+ local partition=$2
+
+ # first, use the console consumer to produce a dummy consumer group
+ read_messages_from_kafka 0 $topic dummy-consumer
+
+ # then use the consumer offset utility to get the LOG_END_OFFSET value for the specified partition
+ $KAFKA_DIR/bin/kafka-consumer-groups.sh --describe --group dummy-consumer --bootstrap-server localhost:9092 2> /dev/null \
+ | grep "$topic \+$partition" \
+ | tr -s " " \
+ | cut -d " " -f 4
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e77f9d28/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
index 4d7525f..abc6186 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
@@ -54,7 +54,8 @@ create_kafka_topic 1 1 test-output
$FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
--input-topic test-input --output-topic test-output \
--prefix=PREFIX \
- --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest
+ --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest \
+ --flink.partition-discovery.interval-millis 1000
function verify_output {
local expected=$(printf $1)
@@ -68,13 +69,51 @@ function verify_output {
fi
}
+echo "Sending messages to Kafka topic [test-input] ..."
# send some data to Kafka
send_messages_to_kafka "elephant,5,45218\nsquirrel,12,46213\nbee,3,51348\nsquirrel,22,52444\nbee,10,53412\nelephant,9,54867" test-input
-KEY_1_MSGS=$(read_messages_from_kafka 6 test-output | grep elephant)
-KEY_2_MSGS=$(read_messages_from_kafka 6 test-output | grep squirrel)
-KEY_3_MSGS=$(read_messages_from_kafka 6 test-output | grep bee)
+
+echo "Verifying messages from Kafka topic [test-output] ..."
+
+KEY_1_MSGS=$(read_messages_from_kafka 6 test-output elephant_consumer | grep elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 6 test-output squirrel_consumer | grep squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 6 test-output bee_consumer | grep bee)
# check all keys; make sure we have actual newlines in the string, not "\n"
verify_output "elephant,5,45218\nelephant,14,54867" "$KEY_1_MSGS"
verify_output "squirrel,12,46213\nsquirrel,34,52444" "$KEY_2_MSGS"
verify_output "bee,3,51348\nbee,13,53412" "$KEY_3_MSGS"
+
+# now, we add a new partition to the topic
+echo "Repartitioning Kafka topic [test-input] ..."
+modify_num_partitions test-input 2
+
+if (( $(get_num_partitions test-input) != 2 )); then
+ echo "Failed adding a partition to test-input topic."
+ PASS=""
+ exit 1
+fi
+
+# send some more messages to Kafka
+echo "Sending more messages to Kafka topic [test-input] ..."
+send_messages_to_kafka "elephant,13,64213\ngiraffe,9,65555\nbee,5,65647\nsquirrel,18,66413" test-input
+
+# verify that our assumption that the new partition actually has written messages is correct
+if (( $(get_partition_end_offset test-input 1) == 0 )); then
+ echo "The newly created partition does not have any new messages, and therefore partition discovery cannot be verified."
+ PASS=""
+ exit 1
+fi
+
+# all new messages should have been consumed, and has produced correct output
+echo "Verifying messages from Kafka topic [test-output] ..."
+
+KEY_1_MSGS=$(read_messages_from_kafka 4 test-output elephant_consumer | grep elephant)
+KEY_2_MSGS=$(read_messages_from_kafka 4 test-output squirrel_consumer | grep squirrel)
+KEY_3_MSGS=$(read_messages_from_kafka 4 test-output bee_consumer | grep bee)
+KEY_4_MSGS=$(read_messages_from_kafka 10 test-output giraffe_consumer | grep giraffe)
+
+verify_output "elephant,27,64213" "$KEY_1_MSGS"
+verify_output "squirrel,52,66413" "$KEY_2_MSGS"
+verify_output "bee,18,65647" "$KEY_3_MSGS"
+verify_output "giraffe,9,65555" "$KEY_4_MSGS"
[2/2] flink git commit: [FLINK-9124] [kinesis] Allow customization of
KinesisProxy.getRecords read timeout and retry.
Posted by tz...@apache.org.
[FLINK-9124] [kinesis] Allow customization of KinesisProxy.getRecords read timeout and retry.
This closes #5803.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fec5cae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fec5cae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fec5cae
Branch: refs/heads/master
Commit: 9fec5cae65a7cddea5dff47e5cade50493f6cbdf
Parents: d5ec911
Author: Thomas Weise <th...@apache.org>
Authored: Mon Apr 2 20:49:50 2018 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Apr 18 20:09:34 2018 +0800
----------------------------------------------------------------------
.../connectors/kinesis/proxy/KinesisProxy.java | 40 +++++++++++++++++---
.../connectors/kinesis/util/AWSUtil.java | 17 +++++++--
.../kinesis/proxy/KinesisProxyTest.java | 27 +++++++++++++
3 files changed, 74 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9fec5cae/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 057e18d..3486206 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -23,6 +23,9 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
@@ -125,7 +128,7 @@ public class KinesisProxy implements KinesisProxyInterface {
protected KinesisProxy(Properties configProps) {
checkNotNull(configProps);
- this.kinesisClient = AWSUtil.createKinesisClient(configProps);
+ this.kinesisClient = createKinesisClient(configProps);
this.describeStreamBaseBackoffMillis = Long.valueOf(
configProps.getProperty(
@@ -177,6 +180,16 @@ public class KinesisProxy implements KinesisProxyInterface {
}
/**
+ * Create the Kinesis client, using the provided configuration properties and default {@link ClientConfiguration}.
+ * Derived classes can override this method to customize the client configuration.
+ * @param configProps
+ * @return
+ */
+ protected AmazonKinesis createKinesisClient(Properties configProps) {
+ return AWSUtil.createKinesisClient(configProps, new ClientConfigurationFactory().getConfig());
+ }
+
+ /**
* Creates a Kinesis proxy.
*
* @param configProps configuration properties
@@ -201,12 +214,12 @@ public class KinesisProxy implements KinesisProxyInterface {
while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
try {
getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
- } catch (AmazonServiceException ex) {
- if (isRecoverableException(ex)) {
+ } catch (SdkClientException ex) {
+ if (isRecoverableSdkClientException(ex)) {
long backoffMillis = fullJitterBackoff(
getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
- LOG.warn("Got recoverable AmazonServiceException. Backing off for "
- + backoffMillis + " millis (" + ex.getErrorMessage() + ")");
+ LOG.warn("Got recoverable SdkClientException. Backing off for "
+ + backoffMillis + " millis (" + ex.getMessage() + ")");
Thread.sleep(backoffMillis);
} else {
throw ex;
@@ -303,6 +316,21 @@ public class KinesisProxy implements KinesisProxyInterface {
* @return <code>true</code> if the exception can be recovered from, else
* <code>false</code>
*/
+ protected boolean isRecoverableSdkClientException(SdkClientException ex) {
+ if (ex instanceof AmazonServiceException) {
+ return KinesisProxy.isRecoverableException((AmazonServiceException) ex);
+ }
+ // customizations may decide to retry other errors, such as read timeouts
+ return false;
+ }
+
+ /**
+ * Determines whether the exception is recoverable using exponential-backoff.
+ *
+ * @param ex Exception to inspect
+ * @return <code>true</code> if the exception can be recovered from, else
+ * <code>false</code>
+ */
protected static boolean isRecoverableException(AmazonServiceException ex) {
if (ex.getErrorType() == null) {
return false;
@@ -397,7 +425,7 @@ public class KinesisProxy implements KinesisProxyInterface {
return describeStreamResult;
}
- private static long fullJitterBackoff(long base, long max, double power, int attempt) {
+ protected static long fullJitterBackoff(long base, long max, double power, int attempt) {
long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
return (long) (seed.nextDouble() * exponentialBackoff); // random jitter between 0 and the exponential backoff
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9fec5cae/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 15e6cce..2e9090e 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -52,11 +52,20 @@ public class AWSUtil {
* @return a new AmazonKinesis client
*/
public static AmazonKinesis createKinesisClient(Properties configProps) {
+ return createKinesisClient(configProps, new ClientConfigurationFactory().getConfig());
+ }
+
+ /**
+ * Creates an Amazon Kinesis Client.
+ * @param configProps configuration properties containing the access key, secret key, and region
+ * @param awsClientConfig preconfigured AWS SDK client configuration
+ * @return a new Amazon Kinesis Client
+ */
+ public static AmazonKinesis createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) {
// set a Flink-specific user agent
- ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig()
- .withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
- EnvironmentInformation.getVersion(),
- EnvironmentInformation.getRevisionInformation().commitId));
+ awsClientConfig.setUserAgentPrefix(String.format(USER_AGENT_FORMAT,
+ EnvironmentInformation.getVersion(),
+ EnvironmentInformation.getRevisionInformation().commitId));
// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
http://git-wip-us.apache.org/repos/asf/flink/blob/9fec5cae/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 7ca05d7..c84d89b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -17,12 +17,22 @@
package org.apache.flink.streaming.connectors.kinesis.proxy;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+
import com.amazonaws.AmazonServiceException;
import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import org.junit.Test;
+import org.powermock.reflect.Whitebox;
+
+import java.util.Properties;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -59,4 +69,21 @@ public class KinesisProxyTest {
assertFalse(KinesisProxy.isRecoverableException(ex));
}
+ @Test
+ public void testCustomConfigurationOverride() {
+ Properties configProps = new Properties();
+ configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ KinesisProxy proxy = new KinesisProxy(configProps) {
+ @Override
+ protected AmazonKinesis createKinesisClient(Properties configProps) {
+ ClientConfiguration clientConfig = new ClientConfigurationFactory().getConfig();
+ clientConfig.setSocketTimeout(10000);
+ return AWSUtil.createKinesisClient(configProps, clientConfig);
+ }
+ };
+ AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient");
+ ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, "clientConfiguration");
+ assertEquals(10000, clientConfiguration.getSocketTimeout());
+ }
+
}