You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/24 03:55:14 UTC

[1/4] flink git commit: [FLINK-7224] [kafka, docs] Fix incorrect Javadoc / docs regarding Kafka partition metadata querying

Repository: flink
Updated Branches:
  refs/heads/master 3c7560853 -> 35564f25c


[FLINK-7224] [kafka, docs] Fix incorrect Javadoc / docs regarding Kafka partition metadata querying

Since Flink 1.3, partition metadata is no longer queried on the client
side. This commit corrects the statements of this legacy behaviour in
the Javadocs and documentation.

This closes #4310.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58b53748
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58b53748
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58b53748

Branch: refs/heads/master
Commit: 58b53748293c160c28c7f9d08c3a0ad23152d34f
Parents: 3c75608
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Jul 19 14:24:27 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Jul 24 10:25:05 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                                   | 6 ------
 .../streaming/connectors/kafka/FlinkKafkaConsumer010.java      | 4 ----
 .../flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java | 4 ----
 .../flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java | 4 ----
 4 files changed, 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58b53748/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 23ad3b8..d4e8978 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -134,12 +134,6 @@ stream = env
 </div>
 </div>
 
-The current FlinkKafkaConsumer implementation will establish a connection from the client (when calling the constructor)
-for querying the list of topics and partitions.
-
-For this to work, the consumer needs to be able to access the consumers from the machine submitting the job to the Flink cluster.
-If you experience any issues with the Kafka consumer on the client side, the client log might contain information about failed requests, etc.
-
 ### The `DeserializationSchema`
 
 The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into Java/Scala objects. The

http://git-wip-us.apache.org/repos/asf/flink/blob/58b53748/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index b851e3e..3a6a13b 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -56,10 +56,6 @@ import java.util.Properties;
  *
  * <p>Please refer to Kafka's documentation for the available configuration properties:
  * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
- *
- * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
- * is constructed. That means that the client that submits the program needs to be able to
- * reach the Kafka brokers or ZooKeeper.</p>
  */
 public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58b53748/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index c4cd3e7..f41a4e3 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -78,10 +78,6 @@ import static org.apache.flink.util.PropertiesUtil.getLong;
  *
  * <p>When using a Kafka topic to send data between Flink jobs, we recommend using the
  * {@see TypeInformationSerializationSchema} and {@see TypeInformationKeyValueSerializationSchema}.</p>
- *
- * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
- * is constructed. That means that the client that submits the program needs to be able to
- * reach the Kafka brokers or ZooKeeper.</p>
  */
 public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58b53748/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 48899db..0cf40e6 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -62,10 +62,6 @@ import static org.apache.flink.util.PropertiesUtil.getLong;
  *
  * <p>Please refer to Kafka's documentation for the available configuration properties:
  * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
- *
- * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
- * is constructed. That means that the client that submits the program needs to be able to
- * reach the Kafka brokers or ZooKeeper.</p>
  */
 public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 


[3/4] flink git commit: [FLINK-7143] [kafka, tests] Stricter tests for deterministic partition assignment

Posted by tz...@apache.org.
[FLINK-7143] [kafka, tests] Stricter tests for deterministic partition assignment

Previous test coverage for partition assignment was not strict enough.
It did verify that partitions are assigned to the exact expected
subtasks, which is crucial for verifying that partition to subtask
assignments are deterministic, and insensitive to the ordering of the
fetched partition metadata.

This close #4302.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e16e9d99
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e16e9d99
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e16e9d99

Branch: refs/heads/master
Commit: e16e9d99fded74e2c92083e3734ba94eea927fb0
Parents: c65afdb
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Jul 12 02:36:00 2017 +0900
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Jul 24 10:28:04 2017 +0800

----------------------------------------------------------------------
 .../AbstractPartitionDiscovererTest.java        | 77 ++++++++++++++++++--
 1 file changed, 71 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e16e9d99/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
index 71c4c1b..b00d74d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
@@ -72,10 +72,10 @@ public class AbstractPartitionDiscovererTest {
 			new KafkaTopicPartition(TEST_TOPIC, 2),
 			new KafkaTopicPartition(TEST_TOPIC, 3));
 
-		for (int i = 0; i < mockGetAllPartitionsForTopicsReturn.size(); i++) {
+		for (int subtaskIndex = 0; subtaskIndex < mockGetAllPartitionsForTopicsReturn.size(); subtaskIndex++) {
 			TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
 					topicsDescriptor,
-					i,
+					subtaskIndex,
 					mockGetAllPartitionsForTopicsReturn.size(),
 					createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
 					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
@@ -84,6 +84,9 @@ public class AbstractPartitionDiscovererTest {
 			List<KafkaTopicPartition> initialDiscovery = partitionDiscoverer.discoverPartitions();
 			assertEquals(1, initialDiscovery.size());
 			assertTrue(contains(mockGetAllPartitionsForTopicsReturn, initialDiscovery.get(0).getPartition()));
+			assertEquals(
+				getExpectedSubtaskIndex(initialDiscovery.get(0), mockGetAllPartitionsForTopicsReturn.size()),
+				subtaskIndex);
 
 			// subsequent discoveries should not find anything
 			List<KafkaTopicPartition> secondDiscovery = partitionDiscoverer.discoverPartitions();
@@ -111,10 +114,10 @@ public class AbstractPartitionDiscovererTest {
 			final int minPartitionsPerConsumer = mockGetAllPartitionsForTopicsReturn.size() / numConsumers;
 			final int maxPartitionsPerConsumer = mockGetAllPartitionsForTopicsReturn.size() / numConsumers + 1;
 
-			for (int i = 0; i < numConsumers; i++) {
+			for (int subtaskIndex = 0; subtaskIndex < numConsumers; subtaskIndex++) {
 				TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
 						topicsDescriptor,
-						i,
+						subtaskIndex,
 						numConsumers,
 						createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
 						createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
@@ -127,6 +130,7 @@ public class AbstractPartitionDiscovererTest {
 				for (KafkaTopicPartition p : initialDiscovery) {
 					// check that the element was actually contained
 					assertTrue(allPartitions.remove(p));
+					assertEquals(getExpectedSubtaskIndex(p, numConsumers), subtaskIndex);
 				}
 
 				// subsequent discoveries should not find anything
@@ -159,10 +163,10 @@ public class AbstractPartitionDiscovererTest {
 
 			final int numConsumers = 2 * mockGetAllPartitionsForTopicsReturn.size() + 3;
 
-			for (int i = 0; i < numConsumers; i++) {
+			for (int subtaskIndex = 0; subtaskIndex < numConsumers; subtaskIndex++) {
 				TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
 						topicsDescriptor,
-						i,
+						subtaskIndex,
 						numConsumers,
 						createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
 						createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
@@ -174,6 +178,7 @@ public class AbstractPartitionDiscovererTest {
 				for (KafkaTopicPartition p : initialDiscovery) {
 					// check that the element was actually contained
 					assertTrue(allPartitions.remove(p));
+					assertEquals(getExpectedSubtaskIndex(p, numConsumers), subtaskIndex);
 				}
 
 				// subsequent discoveries should not find anything
@@ -255,16 +260,19 @@ public class AbstractPartitionDiscovererTest {
 			for (KafkaTopicPartition p : initialDiscoverySubtask0) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask1) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask2) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
 			}
 
 			// all partitions must have been assigned
@@ -291,26 +299,32 @@ public class AbstractPartitionDiscovererTest {
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask0) {
 				assertTrue(allNewPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask1) {
 				assertTrue(allNewPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask2) {
 				assertTrue(allNewPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
 			}
 
 			for (KafkaTopicPartition p : secondDiscoverySubtask0) {
 				assertTrue(allNewPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
 			}
 
 			for (KafkaTopicPartition p : secondDiscoverySubtask1) {
 				assertTrue(allNewPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
 			}
 
 			for (KafkaTopicPartition p : secondDiscoverySubtask2) {
 				assertTrue(allNewPartitions.remove(p));
+				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
 			}
 
 			// all partitions must have been assigned
@@ -322,6 +336,53 @@ public class AbstractPartitionDiscovererTest {
 		}
 	}
 
+	@Test
+	public void testDeterministicAssignmentWithDifferentFetchedPartitionOrdering() throws Exception {
+		int numSubtasks = 4;
+
+		List<KafkaTopicPartition> mockGetAllPartitionsForTopicsReturn = Arrays.asList(
+			new KafkaTopicPartition("test-topic", 0),
+			new KafkaTopicPartition("test-topic", 1),
+			new KafkaTopicPartition("test-topic", 2),
+			new KafkaTopicPartition("test-topic", 3),
+			new KafkaTopicPartition("test-topic2", 0),
+			new KafkaTopicPartition("test-topic2", 1));
+
+		List<KafkaTopicPartition> mockGetAllPartitionsForTopicsReturnOutOfOrder = Arrays.asList(
+			new KafkaTopicPartition("test-topic", 3),
+			new KafkaTopicPartition("test-topic", 1),
+			new KafkaTopicPartition("test-topic2", 1),
+			new KafkaTopicPartition("test-topic", 0),
+			new KafkaTopicPartition("test-topic2", 0),
+			new KafkaTopicPartition("test-topic", 2));
+
+		for (int subtaskIndex = 0; subtaskIndex < numSubtasks; subtaskIndex++) {
+			TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
+					topicsDescriptor,
+					subtaskIndex,
+					numSubtasks,
+					createMockGetAllTopicsSequenceFromFixedReturn(Arrays.asList("test-topic", "test-topic2")),
+					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
+			partitionDiscoverer.open();
+
+			TestPartitionDiscoverer partitionDiscovererOutOfOrder = new TestPartitionDiscoverer(
+					topicsDescriptor,
+					subtaskIndex,
+					numSubtasks,
+					createMockGetAllTopicsSequenceFromFixedReturn(Arrays.asList("test-topic", "test-topic2")),
+					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
+			partitionDiscovererOutOfOrder.open();
+
+			List<KafkaTopicPartition> discoveredPartitions = partitionDiscoverer.discoverPartitions();
+			List<KafkaTopicPartition> discoveredPartitionsOutOfOrder = partitionDiscovererOutOfOrder.discoverPartitions();
+
+			// the subscribed partitions should be identical, regardless of the input partition ordering
+			Collections.sort(discoveredPartitions, new KafkaTopicPartition.Comparator());
+			Collections.sort(discoveredPartitionsOutOfOrder, new KafkaTopicPartition.Comparator());
+			assertEquals(discoveredPartitions, discoveredPartitionsOutOfOrder);
+		}
+	}
+
 	private static class TestPartitionDiscoverer extends AbstractPartitionDiscoverer {
 
 		private final KafkaTopicsDescriptor topicsDescriptor;
@@ -425,4 +486,8 @@ public class AbstractPartitionDiscovererTest {
 
 		return clone;
 	}
+
+	private static int getExpectedSubtaskIndex(KafkaTopicPartition partition, int numTasks) {
+		return Math.abs(partition.hashCode() % numTasks);
+	}
 }


[2/4] flink git commit: [hotfix] [kafka, tests] Commit read offsets in Kafka integration tests

Posted by tz...@apache.org.
[hotfix] [kafka, tests] Commit read offsets in Kafka integration tests

Previously offsets were not commited so the same records could be read more then once.
It was not a big issue, because so far this methods were used only for at-least-once tests.

This closes #4310.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c65afdbe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c65afdbe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c65afdbe

Branch: refs/heads/master
Commit: c65afdbe6a711c8df0677c11285dc317dac3046e
Parents: 58b5374
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Jul 12 14:01:49 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Jul 24 10:25:21 2017 +0800

----------------------------------------------------------------------
 .../kafka/KafkaTestEnvironmentImpl.java         | 33 +++++++++++---------
 .../kafka/KafkaTestEnvironmentImpl.java         | 24 +++++++-------
 .../kafka/KafkaTestEnvironmentImpl.java         | 26 ++++++++-------
 3 files changed, 45 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c65afdbe/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index f437060..d3b45a9 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -127,23 +127,26 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	@Override
 	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
 		List<ConsumerRecord<K, V>> result = new ArrayList<>();
-		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
-		consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
-
-		while (true) {
-			boolean processedAtLeastOneRecord = false;
-
-			// wait for new records with timeout and break the loop if we didn't get any
-			Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
-			while (iterator.hasNext()) {
-				ConsumerRecord<K, V> record = iterator.next();
-				result.add(record);
-				processedAtLeastOneRecord = true;
-			}
 
-			if (!processedAtLeastOneRecord) {
-				break;
+		try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
+			consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+
+			while (true) {
+				boolean processedAtLeastOneRecord = false;
+
+				// wait for new records with timeout and break the loop if we didn't get any
+				Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+				while (iterator.hasNext()) {
+					ConsumerRecord<K, V> record = iterator.next();
+					result.add(record);
+					processedAtLeastOneRecord = true;
+				}
+
+				if (!processedAtLeastOneRecord) {
+					break;
+				}
 			}
+			consumer.commitSync();
 		}
 
 		return UnmodifiableList.decorate(result);

http://git-wip-us.apache.org/repos/asf/flink/blob/c65afdbe/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 4791716..ab976e1 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -118,19 +118,21 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	@Override
 	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
 		List<ConsumerRecord<K, V>> result = new ArrayList<>();
-		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
-		consumer.subscribe(new TopicPartition(topic, partition));
+		try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
+			consumer.subscribe(new TopicPartition(topic, partition));
 
-		while (true) {
-			Map<String, ConsumerRecords<K, V>> topics = consumer.poll(timeout);
-			if (topics == null || !topics.containsKey(topic)) {
-				break;
-			}
-			List<ConsumerRecord<K, V>> records = topics.get(topic).records(partition);
-			result.addAll(records);
-			if (records.size() == 0) {
-				break;
+			while (true) {
+				Map<String, ConsumerRecords<K, V>> topics = consumer.poll(timeout);
+				if (topics == null || !topics.containsKey(topic)) {
+					break;
+				}
+				List<ConsumerRecord<K, V>> records = topics.get(topic).records(partition);
+				result.addAll(records);
+				if (records.size() == 0) {
+					break;
+				}
 			}
+			consumer.commit(true);
 		}
 
 		return UnmodifiableList.decorate(result);

http://git-wip-us.apache.org/repos/asf/flink/blob/c65afdbe/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 710d917..df95420 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -110,22 +110,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	@Override
 	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
 		List<ConsumerRecord<K, V>> result = new ArrayList<>();
-		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
-		consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+		try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
+			consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
 
-		while (true) {
-			boolean processedAtLeastOneRecord = false;
+			while (true) {
+				boolean processedAtLeastOneRecord = false;
 
-			Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
-			while (iterator.hasNext()) {
-				ConsumerRecord<K, V> record = iterator.next();
-				result.add(record);
-				processedAtLeastOneRecord = true;
-			}
+				Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+				while (iterator.hasNext()) {
+					ConsumerRecord<K, V> record = iterator.next();
+					result.add(record);
+					processedAtLeastOneRecord = true;
+				}
 
-			if (!processedAtLeastOneRecord) {
-				break;
+				if (!processedAtLeastOneRecord) {
+					break;
+				}
 			}
+			consumer.commitSync();
 		}
 
 		return UnmodifiableList.decorate(result);


[4/4] flink git commit: [FLINK-6365] [kinesis] Adapt default values of the Kinesis connector

Posted by tz...@apache.org.
[FLINK-6365] [kinesis] Adapt default values of the Kinesis connector

The previous GET_SHARDS_MAX and GET_SHARDS_INTERVAL_MILLIS did not work
well with AWS's service limitations, leading to poor Kinesis connector
performace if used directly out-of-the-box. This commit adapats them to
follow the default values used by the AWS SDK.

This closes #4375.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35564f25
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35564f25
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35564f25

Branch: refs/heads/master
Commit: 35564f25c844b827ce325453b5d518416e1bd5a8
Parents: e16e9d9
Author: Bowen Li <bo...@gmail.com>
Authored: Wed Jul 19 23:35:28 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon Jul 24 10:36:41 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                                   | 4 ++--
 .../connectors/kinesis/config/ConsumerConfigConstants.java       | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35564f25/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 1fcc529..5fbf24b 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -250,8 +250,8 @@ of this API, the consumer will retry if Kinesis complains that the data size / t
 up to a default of 3 attempts. Users can either try to slow down other non-Flink consuming applications, or adjust the throughput
 of the consumer by setting the `ConsumerConfigConstants.SHARD_GETRECORDS_MAX` and
 `ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the supplied configuration properties. Setting the former
-adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 100), while
-the latter modifies the sleep interval between each fetch (there will be no sleep by default). The retry behaviour of the
+adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 10,000), while
+the latter modifies the sleep interval between each fetch (default is 200). The retry behaviour of the
 consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`.
 
 ## Kinesis Producer

http://git-wip-us.apache.org/repos/asf/flink/blob/35564f25/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 8362776..702ed27 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -119,7 +119,7 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
-	public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100;
+	public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000;
 
 	public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
 
@@ -129,7 +129,7 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final double DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
-	public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0L;
+	public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 200L;
 
 	public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;