You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/09/11 14:11:20 UTC

flink git commit: [FLINK-2656] Fix behavior of FlinkKafkaConsumer for out of range offsets

Repository: flink
Updated Branches:
  refs/heads/master a492ed922 -> 635456890


[FLINK-2656] Fix behavior of FlinkKafkaConsumer for out of range offsets

This closes #1117


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63545689
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63545689
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63545689

Branch: refs/heads/master
Commit: 635456890fe4d43a1c365d84ae45b5f34af056ce
Parents: a492ed9
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Sep 10 18:47:45 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Sep 11 14:08:41 2015 +0200

----------------------------------------------------------------------
 .../kafka/internals/LegacyFetcher.java          | 68 ++++++++++++++------
 .../connectors/kafka/KafkaConsumerTestBase.java | 38 +++++++----
 .../streaming/connectors/kafka/KafkaITCase.java |  5 ++
 3 files changed, 76 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63545689/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index 87bd075..c4ba103 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.StringUtils;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -356,29 +357,24 @@ public class LegacyFetcher implements Fetcher {
 				// make sure that all partitions have some offsets to start with
 				// those partitions that do not have an offset from a checkpoint need to get
 				// their start offset from ZooKeeper
-				
-				List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
+				{
+					List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
 
-				for (FetchPartition fp : partitions) {
-					if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
-						// retrieve the offset from the consumer
-						partitionsToGetOffsetsFor.add(fp);
+					for (FetchPartition fp : partitions) {
+						if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
+							// retrieve the offset from the consumer
+							partitionsToGetOffsetsFor.add(fp);
+						}
 					}
-				}
-				if (partitionsToGetOffsetsFor.size() > 0) {
-					long timeType;
-					if (config.getProperty("auto.offset.reset", "latest").equals("latest")) {
-						timeType = OffsetRequest.LatestTime();
-					} else {
-						timeType = OffsetRequest.EarliestTime();
+					if (partitionsToGetOffsetsFor.size() > 0) {
+						getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
+						LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
+								topic, partitionsToGetOffsetsFor);
 					}
-					getLastOffset(consumer, topic, partitionsToGetOffsetsFor, timeType);
-					LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
-							topic, partitionsToGetOffsetsFor);
 				}
 				
 				// Now, the actual work starts :-)
-				
+				int OffsetOutOfRangeCount = 0;
 				while (running) {
 					FetchRequestBuilder frb = new FetchRequestBuilder();
 					frb.clientId(clientId);
@@ -396,14 +392,34 @@ public class LegacyFetcher implements Fetcher {
 
 					if (fetchResponse.hasError()) {
 						String exception = "";
+						List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
 						for (FetchPartition fp : partitions) {
-							short code;
-							if ((code = fetchResponse.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
-								exception += "\nException for partition " + fp.partition + ": " + 
+							short code = fetchResponse.errorCode(topic, fp.partition);
+
+							if(code == ErrorMapping.OffsetOutOfRangeCode()) {
+								// we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
+								// Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
+								partitionsToGetOffsetsFor.add(fp);
+							} else if(code != ErrorMapping.NoError()) {
+								exception += "\nException for partition " + fp.partition + ": " +
 										StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
 							}
 						}
-						throw new IOException("Error while fetching from broker: " + exception);
+						if (partitionsToGetOffsetsFor.size() > 0) {
+							// safeguard against an infinite loop.
+							if(OffsetOutOfRangeCount++ > 0) {
+								throw new RuntimeException("Found invalid offsets more than once in partitions "+partitionsToGetOffsetsFor.toString()+" " +
+										"Exceptions: "+exception);
+							}
+							// get valid offsets for these partitions and try again.
+							LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
+							getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
+							LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
+							continue; // jump back to create a new fetch request. The offset has not been touched.
+						} else {
+							// all partitions failed on an error
+							throw new IOException("Error while fetching from broker: " + exception);
+						}
 					}
 
 					int messagesInFetch = 0;
@@ -515,6 +531,16 @@ public class LegacyFetcher implements Fetcher {
 				fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
 			}
 		}
+
+		private static long getInvalidOffsetBehavior(Properties config) {
+			long timeType;
+			if (config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").equals("latest")) {
+				timeType = OffsetRequest.LatestTime();
+			} else {
+				timeType = OffsetRequest.EarliestTime();
+			}
+			return timeType;
+		}
 	}
 	
 	private static class PartitionInfoFetcher extends Thread {

http://git-wip-us.apache.org/repos/asf/flink/blob/63545689/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index f105183..ed1644c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -380,11 +380,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		FailingIdentityMapper.failedBefore = false;
 		tryExecute(env, "One-to-one exactly once test");
 
-		// this cannot be reliably checked, as checkpoints come in time intervals, and
-		// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-
 		deleteTestTopic(topic);
 	}
 
@@ -432,11 +427,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		FailingIdentityMapper.failedBefore = false;
 		tryExecute(env, "One-source-multi-partitions exactly once test");
 
-		// this cannot be reliably checked, as checkpoints come in time intervals, and
-		// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-
 		deleteTestTopic(topic);
 	}
 
@@ -485,10 +475,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		FailingIdentityMapper.failedBefore = false;
 		tryExecute(env, "multi-source-one-partitions exactly once test");
 
-		// this cannot be reliably checked, as checkpoints come in time intervals, and
-		// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
 
 		deleteTestTopic(topic);
 	}
@@ -663,6 +649,30 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		deleteTestTopic(topic);
 	}
 
+	public void runInvalidOffsetTest() throws Exception {
+		final String topic = "invalidOffsetTopic";
+		final int parallelism = 1;
+
+		// create topic
+		createTestTopic(topic, parallelism, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+
+		// write 20 messages into topic:
+		writeSequence(env, topic, 20, parallelism);
+
+		// set invalid offset:
+		ZkClient zkClient = createZookeeperClient();
+		ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topic, 0, 1234);
+
+		// read from topic
+		final int valuesCount = 20;
+		final int startFrom = 0;
+		readSequence(env, standardCC.props().props(), parallelism, topic, valuesCount, startFrom);
+
+		deleteTestTopic(topic);
+	}
+
 	/**
 	 * Test Flink's Kafka integration also with very big records (30MB)
 	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message

http://git-wip-us.apache.org/repos/asf/flink/blob/63545689/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 334dfb2..b4511ce 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -67,6 +67,11 @@ public class KafkaITCase extends KafkaConsumerTestBase {
 		runFailOnDeployTest();
 	}
 
+	@Test
+	public void testInvalidOffset() throws Exception {
+		runInvalidOffsetTest();
+	}
+
 	// --- source to partition mappings and exactly once ---
 	
 	@Test