You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/09/11 14:11:20 UTC
flink git commit: [FLINK-2656] Fix behavior of FlinkKafkaConsumer for
out of range offsets
Repository: flink
Updated Branches:
refs/heads/master a492ed922 -> 635456890
[FLINK-2656] Fix behavior of FlinkKafkaConsumer for out of range offsets
This closes #1117
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63545689
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63545689
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63545689
Branch: refs/heads/master
Commit: 635456890fe4d43a1c365d84ae45b5f34af056ce
Parents: a492ed9
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Sep 10 18:47:45 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Sep 11 14:08:41 2015 +0200
----------------------------------------------------------------------
.../kafka/internals/LegacyFetcher.java | 68 ++++++++++++++------
.../connectors/kafka/KafkaConsumerTestBase.java | 38 +++++++----
.../streaming/connectors/kafka/KafkaITCase.java | 5 ++
3 files changed, 76 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/63545689/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index 87bd075..c4ba103 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -356,29 +357,24 @@ public class LegacyFetcher implements Fetcher {
// make sure that all partitions have some offsets to start with
// those partitions that do not have an offset from a checkpoint need to get
// their start offset from ZooKeeper
-
- List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
+ {
+ List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
- for (FetchPartition fp : partitions) {
- if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
- // retrieve the offset from the consumer
- partitionsToGetOffsetsFor.add(fp);
+ for (FetchPartition fp : partitions) {
+ if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
+ // retrieve the offset from the consumer
+ partitionsToGetOffsetsFor.add(fp);
+ }
}
- }
- if (partitionsToGetOffsetsFor.size() > 0) {
- long timeType;
- if (config.getProperty("auto.offset.reset", "latest").equals("latest")) {
- timeType = OffsetRequest.LatestTime();
- } else {
- timeType = OffsetRequest.EarliestTime();
+ if (partitionsToGetOffsetsFor.size() > 0) {
+ getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
+ LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
+ topic, partitionsToGetOffsetsFor);
}
- getLastOffset(consumer, topic, partitionsToGetOffsetsFor, timeType);
- LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
- topic, partitionsToGetOffsetsFor);
}
// Now, the actual work starts :-)
-
+ int OffsetOutOfRangeCount = 0;
while (running) {
FetchRequestBuilder frb = new FetchRequestBuilder();
frb.clientId(clientId);
@@ -396,14 +392,34 @@ public class LegacyFetcher implements Fetcher {
if (fetchResponse.hasError()) {
String exception = "";
+ List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
for (FetchPartition fp : partitions) {
- short code;
- if ((code = fetchResponse.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
- exception += "\nException for partition " + fp.partition + ": " +
+ short code = fetchResponse.errorCode(topic, fp.partition);
+
+ if(code == ErrorMapping.OffsetOutOfRangeCode()) {
+ // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
+ // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
+ partitionsToGetOffsetsFor.add(fp);
+ } else if(code != ErrorMapping.NoError()) {
+ exception += "\nException for partition " + fp.partition + ": " +
StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
}
}
- throw new IOException("Error while fetching from broker: " + exception);
+ if (partitionsToGetOffsetsFor.size() > 0) {
+ // safeguard against an infinite loop.
+ if(OffsetOutOfRangeCount++ > 0) {
+ throw new RuntimeException("Found invalid offsets more than once in partitions "+partitionsToGetOffsetsFor.toString()+" " +
+ "Exceptions: "+exception);
+ }
+ // get valid offsets for these partitions and try again.
+ LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
+ getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
+ LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
+ continue; // jump back to create a new fetch request. The offset has not been touched.
+ } else {
+ // all partitions failed on an error
+ throw new IOException("Error while fetching from broker: " + exception);
+ }
}
int messagesInFetch = 0;
@@ -515,6 +531,16 @@ public class LegacyFetcher implements Fetcher {
fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
}
}
+
+ private static long getInvalidOffsetBehavior(Properties config) {
+ long timeType;
+ if (config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").equals("latest")) {
+ timeType = OffsetRequest.LatestTime();
+ } else {
+ timeType = OffsetRequest.EarliestTime();
+ }
+ return timeType;
+ }
}
private static class PartitionInfoFetcher extends Thread {
http://git-wip-us.apache.org/repos/asf/flink/blob/63545689/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index f105183..ed1644c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -380,11 +380,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
FailingIdentityMapper.failedBefore = false;
tryExecute(env, "One-to-one exactly once test");
- // this cannot be reliably checked, as checkpoints come in time intervals, and
- // failures after a number of elements
-// assertTrue("Job did not do a checkpoint before the failure",
-// FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-
deleteTestTopic(topic);
}
@@ -432,11 +427,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
FailingIdentityMapper.failedBefore = false;
tryExecute(env, "One-source-multi-partitions exactly once test");
- // this cannot be reliably checked, as checkpoints come in time intervals, and
- // failures after a number of elements
-// assertTrue("Job did not do a checkpoint before the failure",
-// FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-
deleteTestTopic(topic);
}
@@ -485,10 +475,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
FailingIdentityMapper.failedBefore = false;
tryExecute(env, "multi-source-one-partitions exactly once test");
- // this cannot be reliably checked, as checkpoints come in time intervals, and
- // failures after a number of elements
-// assertTrue("Job did not do a checkpoint before the failure",
-// FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
deleteTestTopic(topic);
}
@@ -663,6 +649,30 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
deleteTestTopic(topic);
}
+ public void runInvalidOffsetTest() throws Exception {
+ final String topic = "invalidOffsetTopic";
+ final int parallelism = 1;
+
+ // create topic
+ createTestTopic(topic, parallelism, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+
+ // write 20 messages into topic:
+ writeSequence(env, topic, 20, parallelism);
+
+ // set invalid offset:
+ ZkClient zkClient = createZookeeperClient();
+ ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topic, 0, 1234);
+
+ // read from topic
+ final int valuesCount = 20;
+ final int startFrom = 0;
+ readSequence(env, standardCC.props().props(), parallelism, topic, valuesCount, startFrom);
+
+ deleteTestTopic(topic);
+ }
+
/**
* Test Flink's Kafka integration also with very big records (30MB)
* see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
http://git-wip-us.apache.org/repos/asf/flink/blob/63545689/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 334dfb2..b4511ce 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -67,6 +67,11 @@ public class KafkaITCase extends KafkaConsumerTestBase {
runFailOnDeployTest();
}
+ @Test
+ public void testInvalidOffset() throws Exception {
+ runInvalidOffsetTest();
+ }
+
// --- source to partition mappings and exactly once ---
@Test