You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/24 10:44:00 UTC

[1/4] flink git commit: [hotfix] [kafka] Add argument checks to rescaling tests in FlinkKafkaConsumerBase

Repository: flink
Updated Branches:
  refs/heads/release-1.3 ab28c8e84 -> 45a5f875b


[hotfix] [kafka] Add argument checks to rescaling tests in FlinkKafkaConsumerBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c94f0d56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c94f0d56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c94f0d56

Branch: refs/heads/release-1.3
Commit: c94f0d56f9e40ec65d8a3b57ca26fd66c164e333
Parents: 6abd402
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Jul 24 17:20:52 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Jul 24 18:39:38 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumerBaseTest.java  | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c94f0d56/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index de0df5f..04508dc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -36,7 +35,6 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -60,9 +58,7 @@ import java.util.Set;
 
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
 import static org.hamcrest.collection.IsIn.isIn;
-import static org.hamcrest.collection.IsMapContaining.hasEntry;
 import static org.hamcrest.collection.IsMapContaining.hasKey;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
@@ -554,7 +550,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 	@Test
 	public void testScaleDown() throws Exception {
-		testRescaling(5, 10, 2, 1);
+		testRescaling(5, 10, 2, 100);
 	}
 
 	/**
@@ -570,6 +566,10 @@ public class FlinkKafkaConsumerBaseTest {
 		final int restoredParallelism,
 		final int restoredNumPartitions) throws Exception {
 
+		Preconditions.checkArgument(
+			restoredNumPartitions >= numPartitions,
+			"invalid test case for Kafka repartitioning; Kafka only allows increasing partitions.");
+
 		List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = new ArrayList<>();
 		for (int i = 0; i < numPartitions; i++) {
 			mockFetchedPartitionsOnStartup.add(new KafkaTopicPartition("test-topic", i));
@@ -605,7 +605,8 @@ public class FlinkKafkaConsumerBaseTest {
 			globalSubscribedPartitions.putAll(subscribedPartitions);
 		}
 
-
+		// any new partitions after the restore should not have been picked up;
+		// global number of subscribed partitions should still equal the original number during the fresh run
 		assertThat(globalSubscribedPartitions.values(), hasSize(numPartitions));
 		assertThat(mockFetchedPartitionsOnStartup, everyItem(isIn(globalSubscribedPartitions.keySet())));
 
@@ -846,6 +847,7 @@ public class FlinkKafkaConsumerBaseTest {
 			this.isAutoCommitEnabled = isAutoCommitEnabled;
 		}
 	}
+
 	private static final class TestingListState<T> implements ListState<T> {
 
 		private final List<T> list = new ArrayList<>();


[2/4] flink git commit: [FLINK-7174] [kafka connector] Bump Kafka 0.10 dependency to 0.10.2.1

Posted by tz...@apache.org.
[FLINK-7174] [kafka connector] Bump Kafka 0.10 dependency to 0.10.2.1

This closes #4321


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6abd4029
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6abd4029
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6abd4029

Branch: refs/heads/release-1.3
Commit: 6abd40299040ca646e7e94313dd1e0d25a4c8d82
Parents: ab28c8e
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Jul 13 11:07:28 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Jul 24 18:39:38 2017 +0800

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |  2 +-
 .../kafka/KafkaTestEnvironmentImpl.java         | 21 +++++++++++++++-----
 2 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6abd4029/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 9fb0e61..84f1f3d 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -37,7 +37,7 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<kafka.version>0.10.0.1</kafka.version>
+		<kafka.version>0.10.2.1</kafka.version>
 	</properties>
 
 	<dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/6abd4029/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 3094172..7bcd88b 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import kafka.admin.AdminUtils;
 import kafka.common.KafkaException;
+import kafka.metrics.KafkaMetricsReporter;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.collections.list.UnmodifiableList;
@@ -42,8 +42,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +61,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import scala.collection.mutable.ArraySeq;
+
 import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -250,11 +254,18 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			for (int i = 0; i < numKafkaServers; i++) {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
-				if(secureMode) {
-					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+				if (secureMode) {
+					brokerConnectionString += hostAndPortToUrlString(
+							KafkaTestEnvironment.KAFKA_HOST,
+							brokers.get(i).socketServer().boundPort(
+									ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)));
 				} else {
-					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+					brokerConnectionString += hostAndPortToUrlString(
+							KafkaTestEnvironment.KAFKA_HOST,
+							brokers.get(i).socketServer().boundPort(
+									ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)));
 				}
+				brokerConnectionString +=  ",";
 			}
 
 			LOG.info("ZK and KafkaServer started.");
@@ -418,7 +429,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 			try {
 				scala.Option<String> stringNone = scala.Option.apply(null);
-				KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+				KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new ArraySeq<KafkaMetricsReporter>(0));
 				server.startup();
 				return server;
 			}


[3/4] flink git commit: [hotfix] [kafka] Make notifyCheckpointComplete method final in FlinkKafkaConsumerBase

Posted by tz...@apache.org.
[hotfix] [kafka] Make notifyCheckpointComplete method final in FlinkKafkaConsumerBase

This closes #4386.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64c19197
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64c19197
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64c19197

Branch: refs/heads/release-1.3
Commit: 64c19197d4e18410d7f1235437a40964a8bf3dda
Parents: c94f0d5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Jul 24 17:23:31 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Jul 24 18:41:29 2017 +0800

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64c19197/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 718f1f2..5a440e0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -585,7 +585,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	@Override
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+	public final void notifyCheckpointComplete(long checkpointId) throws Exception {
 		if (!running) {
 			LOG.debug("notifyCheckpointComplete() called on closed source");
 			return;


[4/4] flink git commit: [FLINK-6365] [kinesis] Adapt default values of the Kinesis connector

Posted by tz...@apache.org.
[FLINK-6365] [kinesis] Adapt default values of the Kinesis connector

The previous GET_SHARDS_MAX and GET_SHARDS_INTERVAL_MILLIS did not work
well with AWS's service limitations, leading to poor Kinesis connector
performace if used directly out-of-the-box. This commit adapats them to
follow the default values used by the AWS SDK.

This closes #4375.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45a5f875
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45a5f875
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45a5f875

Branch: refs/heads/release-1.3
Commit: 45a5f875b8100689e65187d05b7842f11fb19086
Parents: 64c1919
Author: Bowen Li <bo...@gmail.com>
Authored: Wed Jul 19 23:35:28 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Jul 24 18:42:12 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                                   | 4 ++--
 .../connectors/kinesis/config/ConsumerConfigConstants.java       | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/45a5f875/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 1fcc529..5fbf24b 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -250,8 +250,8 @@ of this API, the consumer will retry if Kinesis complains that the data size / t
 up to a default of 3 attempts. Users can either try to slow down other non-Flink consuming applications, or adjust the throughput
 of the consumer by setting the `ConsumerConfigConstants.SHARD_GETRECORDS_MAX` and
 `ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the supplied configuration properties. Setting the former
-adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 100), while
-the latter modifies the sleep interval between each fetch (there will be no sleep by default). The retry behaviour of the
+adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 10,000), while
+the latter modifies the sleep interval between each fetch (default is 200). The retry behaviour of the
 consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`.
 
 ## Kinesis Producer

http://git-wip-us.apache.org/repos/asf/flink/blob/45a5f875/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 7c31af4..0192c57 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -118,7 +118,7 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
-	public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100;
+	public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000;
 
 	public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
 
@@ -128,7 +128,7 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final double DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
-	public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0L;
+	public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 200L;
 
 	public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;