You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/14 17:00:33 UTC
[1/2] flink git commit: [FLINK-5949] [yarn] Don't check Kerberos
credentials for non-Kerberos authentication methods
Repository: flink
Updated Branches:
refs/heads/master dabeb74c1 -> 90c7415b0
[FLINK-5949] [yarn] Don't check Kerberos credentials for non-Kerberos authentication methods
This closes #3528.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87779ad0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87779ad0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87779ad0
Branch: refs/heads/master
Commit: 87779ad034957923e316c1100becff8288e5c9ca
Parents: dabeb74
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Mar 14 13:42:26 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Mar 15 00:30:47 2017 +0800
----------------------------------------------------------------------
.../org/apache/flink/yarn/AbstractYarnClusterDescriptor.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/87779ad0/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 483e279..8928f0a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -414,10 +414,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
// so we check only in ticket cache scenario.
boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+
UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
- if (useTicketCache && !loginUser.hasKerberosCredentials()) {
- LOG.error("Hadoop security is enabled but the login user does not have Kerberos credentials");
- throw new RuntimeException("Hadoop security is enabled but the login user " +
+ if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
+ && useTicketCache && !loginUser.hasKerberosCredentials()) {
+ LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
+ throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
"does not have Kerberos credentials");
}
}
[2/2] flink git commit: [FLINK-3398] [kafka] Allow disabling offset
committing for FlinkKafkaConsumer
Posted by tz...@apache.org.
[FLINK-3398] [kafka] Allow disabling offset committing for FlinkKafkaConsumer
This closes #3527.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90c7415b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90c7415b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90c7415b
Branch: refs/heads/master
Commit: 90c7415b03b0b1ec28fec9dcd68e426321ad7270
Parents: 87779ad
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Mar 13 13:49:08 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Mar 15 00:58:17 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/kafka.md | 28 ++++
.../connectors/kafka/FlinkKafkaConsumer010.java | 8 +-
.../kafka/internal/Kafka010Fetcher.java | 2 -
.../connectors/kafka/Kafka010FetcherTest.java | 3 -
.../connectors/kafka/FlinkKafkaConsumer08.java | 21 ++-
.../connectors/kafka/Kafka08ITCase.java | 3 +
.../connectors/kafka/KafkaConsumer08Test.java | 31 +++-
.../connectors/kafka/FlinkKafkaConsumer09.java | 15 +-
.../kafka/internal/Kafka09Fetcher.java | 7 -
.../connectors/kafka/Kafka09FetcherTest.java | 3 -
.../kafka/FlinkKafkaConsumerBase.java | 149 +++++++++++++-----
.../kafka/config/OffsetCommitMode.java | 36 +++++
.../kafka/config/OffsetCommitModes.java | 47 ++++++
.../kafka/internals/AbstractFetcher.java | 8 +-
.../FlinkKafkaConsumerBaseMigrationTest.java | 9 +-
.../kafka/FlinkKafkaConsumerBaseTest.java | 154 ++++++++++++++++++-
.../org/apache/flink/util/PropertiesUtil.java | 18 +++
17 files changed, 458 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 6d58b0c..60a8039 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -292,6 +292,34 @@ Flink on YARN supports automatic restart of lost YARN containers.
If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
+### Kafka Consumers Offset Committing Behaviour Configuration
+
+The Flink Kafka Consumer allows configuring the behaviour of how offsets
+are committed back to Kafka brokers (or Zookeeper in 0.8). Note that the
+Flink Kafka Consumer does not rely on the committed offsets for fault
+tolerance guarantees. The committed offsets are only a means to expose
+the consumer's progress for monitoring purposes.
+
+The way to configure offset commit behaviour is different, depending on
+whether or not checkpointing is enabled for the job.
+
+ - *Checkpointing disabled:* if checkpointing is disabled, the Flink Kafka
+ Consumer relies on the automatic periodic offset committing capability
+ of the internally used Kafka clients. Therefore, to disable or enable offset
+ committing, simply set the `enable.auto.commit` (or `auto.commit.enable`
+ for Kafka 0.8) / `auto.commit.interval.ms` keys to appropriate values
+ in the provided `Properties` configuration.
+
+ - *Checkpointing enabled:* if checkpointing is enabled, the Flink Kafka
+ Consumer will commit the offsets stored in the checkpointed states when
+ the checkpoints are completed. This ensures that the committed offsets
+ in Kafka brokers is consistent with the offsets in the checkpointed states.
+ Users can choose to disable or enable offset committing by calling the
+ `setCommitOffsetsOnCheckpoints(boolean)` method on the consumer (by default,
+ the behaviour is `true`).
+ Note that in this scenario, the automatic periodic offset committing
+ settings in `Properties` is completely ignored.
+
### Kafka Consumers and Timestamp Extraction/Watermark Emission
In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself.
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 716fa19..9e06d6e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -20,12 +20,14 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
import java.util.Collections;
@@ -131,9 +133,10 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- StreamingRuntimeContext runtimeContext) throws Exception {
+ StreamingRuntimeContext runtimeContext,
+ OffsetCommitMode offsetCommitMode) throws Exception {
- boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
+ boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
return new Kafka010Fetcher<>(
sourceContext,
@@ -143,7 +146,6 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
- runtimeContext.isCheckpointingEnabled(),
runtimeContext.getTaskNameWithSubtasks(),
runtimeContext.getMetricGroup(),
deserializer,
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index da6ecd0..586d841 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -52,7 +52,6 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
- boolean enableCheckpointing,
String taskNameWithSubtasks,
MetricGroup metricGroup,
KeyedDeserializationSchema<T> deserializer,
@@ -68,7 +67,6 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
- enableCheckpointing,
taskNameWithSubtasks,
metricGroup,
deserializer,
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 17ba712..2d0551d 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -130,7 +130,6 @@ public class Kafka010FetcherTest {
new TestProcessingTimeService(),
10,
getClass().getClassLoader(),
- false, /* checkpointing */
"taskname-with-subtask",
new UnregisteredMetricsGroup(),
schema,
@@ -267,7 +266,6 @@ public class Kafka010FetcherTest {
new TestProcessingTimeService(),
10,
getClass().getClassLoader(),
- false, /* checkpointing */
"taskname-with-subtask",
new UnregisteredMetricsGroup(),
schema,
@@ -382,7 +380,6 @@ public class Kafka010FetcherTest {
new TestProcessingTimeService(),
10, /* watermark interval */
this.getClass().getClassLoader(),
- true, /* checkpointing */
"task_name",
new UnregisteredMetricsGroup(),
schema,
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index bf7ed02..858a790 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -27,6 +27,7 @@ import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -112,9 +113,6 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
/** The properties to parametrize the Kafka consumer and ZooKeeper client */
private final Properties kafkaProperties;
- /** The interval in which to automatically commit (-1 if deactivated) */
- private final long autoCommitInterval;
-
// ------------------------------------------------------------------------
/**
@@ -187,8 +185,6 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
// eagerly check for invalid "auto.offset.reset" values before launching the job
validateAutoOffsetResetValue(props);
-
- this.autoCommitInterval = PropertiesUtil.getLong(props, "auto.commit.interval.ms", 60000);
}
@Override
@@ -197,9 +193,14 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- StreamingRuntimeContext runtimeContext) throws Exception {
+ StreamingRuntimeContext runtimeContext,
+ OffsetCommitMode offsetCommitMode) throws Exception {
+
+ boolean useMetrics = !PropertiesUtil.getBoolean(kafkaProperties, KEY_DISABLE_METRICS, false);
- boolean useMetrics = !Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false"));
+ long autoCommitInterval = (offsetCommitMode == OffsetCommitMode.KAFKA_PERIODIC)
+ ? PropertiesUtil.getLong(kafkaProperties, "auto.commit.interval.ms", 60000)
+ : -1; // this disables the periodic offset committer thread in the fetcher
return new Kafka08Fetcher<>(
sourceContext,
@@ -232,6 +233,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
return partitionInfos;
}
+ @Override
+ protected boolean getIsAutoCommitEnabled() {
+ return PropertiesUtil.getBoolean(kafkaProperties, "auto.commit.enable", true) &&
+ PropertiesUtil.getLong(kafkaProperties, "auto.commit.interval.ms", 60000) > 0;
+ }
+
// ------------------------------------------------------------------------
// Kafka / ZooKeeper communication utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 2e7c368..f5cb8c0 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -202,6 +202,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
// at least once.
Properties readProps = new Properties();
readProps.putAll(standardProps);
+
+ // make sure that auto commit is enabled in the properties
+ readProps.setProperty("auto.commit.enable", "true");
readProps.setProperty("auto.commit.interval.ms", "500");
// read so that the offset can be committed to ZK
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
index 5ae74d7..83cdd90 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -21,21 +21,36 @@ package org.apache.flink.streaming.connectors.kafka;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
+import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -96,7 +111,12 @@ public class KafkaConsumer08Test {
props.setProperty("group.id", "non-existent-group");
props.setProperty(FlinkKafkaConsumer08.GET_PARTITIONS_RETRIES_KEY, "1");
- FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
+ FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
+ Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
+ StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
+ Mockito.when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+ consumer.setRuntimeContext(mockRuntimeContext);
+
consumer.open(new Configuration());
fail();
}
@@ -118,8 +138,13 @@ public class KafkaConsumer08Test {
String zookeeperConnect = "localhost:56794";
String groupId = "non-existent-group";
Properties props = createKafkaProps(zookeeperConnect, unknownHost, groupId);
- FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
- new SimpleStringSchema(), props);
+
+ FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
+ Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
+ StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
+ Mockito.when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+ consumer.setRuntimeContext(mockRuntimeContext);
+
consumer.open(new Configuration());
fail();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index c7236a2..d0284ce 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -20,12 +20,14 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -174,9 +176,10 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- StreamingRuntimeContext runtimeContext) throws Exception {
+ StreamingRuntimeContext runtimeContext,
+ OffsetCommitMode offsetCommitMode) throws Exception {
- boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
+ boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
return new Kafka09Fetcher<>(
sourceContext,
@@ -186,14 +189,12 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
- runtimeContext.isCheckpointingEnabled(),
runtimeContext.getTaskNameWithSubtasks(),
runtimeContext.getMetricGroup(),
deserializer,
properties,
pollTimeout,
useMetrics);
-
}
@Override
@@ -229,6 +230,12 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
return partitions;
}
+ @Override
+ protected boolean getIsAutoCommitEnabled() {
+ return PropertiesUtil.getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
+ PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index c389486..1c87542 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -76,7 +75,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
- boolean enableCheckpointing,
String taskNameWithSubtasks,
MetricGroup metricGroup,
KeyedDeserializationSchema<T> deserializer,
@@ -99,11 +97,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
addOffsetStateGauge(kafkaMetricGroup);
-
- // if checkpointing is enabled, we are not automatically committing to Kafka.
- kafkaProperties.setProperty(
- ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
- Boolean.toString(!enableCheckpointing));
this.consumerThread = new KafkaConsumerThread(
LOG,
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 49144e6..6e13db2 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -130,7 +130,6 @@ public class Kafka09FetcherTest {
new TestProcessingTimeService(),
10, /* watermark interval */
this.getClass().getClassLoader(),
- true, /* checkpointing */
"task_name",
new UnregisteredMetricsGroup(),
schema,
@@ -267,7 +266,6 @@ public class Kafka09FetcherTest {
new TestProcessingTimeService(),
10, /* watermark interval */
this.getClass().getClassLoader(),
- true, /* checkpointing */
"task_name",
new UnregisteredMetricsGroup(),
schema,
@@ -382,7 +380,6 @@ public class Kafka09FetcherTest {
new TestProcessingTimeService(),
10, /* watermark interval */
this.getClass().getClassLoader(),
- true, /* checkpointing */
"task_name",
new UnregisteredMetricsGroup(),
schema,
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 027751c..6858509 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -37,6 +37,8 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -103,8 +105,21 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint;
+ /**
+ * User-set flag determining whether or not to commit on checkpoints.
+ * Note: this flag does not represent the final offset commit mode.
+ */
+ private boolean enableCommitOnCheckpoints = true;
+
+ /**
+ * The offset commit mode for the consumer.
+ * The value of this can only be determined in {@link FlinkKafkaConsumerBase#open(Configuration)} since it depends
+ * on whether or not checkpointing is enabled for the job.
+ */
+ private OffsetCommitMode offsetCommitMode;
+
/** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}) */
- protected StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+ private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS} */
protected Map<KafkaTopicPartition, Long> specificStartupOffsets;
@@ -161,7 +176,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
* {@link AssignerWithPeriodicWatermarks}, not both at the same time.
*
* @param assigner The timestamp assigner / watermark generator to use.
- * @return The consumer object, to allow function chaining.
+ * @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
checkNotNull(assigner);
@@ -196,7 +211,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
* {@link AssignerWithPeriodicWatermarks}, not both at the same time.
*
* @param assigner The timestamp assigner / watermark generator to use.
- * @return The consumer object, to allow function chaining.
+ * @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
checkNotNull(assigner);
@@ -214,6 +229,20 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
}
/**
+ * Specifies whether or not the consumer should commit offsets back to Kafka on checkpoints.
+ *
+ * This setting will only have effect if checkpointing is enabled for the job.
+ * If checkpointing isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+)
+ * property settings will be
+ *
+ * @return The consumer object, to allow function chaining.
+ */
+ public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
+ this.enableCommitOnCheckpoints = commitOnCheckpoints;
+ return this;
+ }
+
+ /**
* Specifies the consumer to start reading from the earliest offset for all partitions.
* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
*
@@ -295,6 +324,29 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
@Override
public void open(Configuration configuration) {
+ // determine the offset commit mode
+ offsetCommitMode = OffsetCommitModes.fromConfiguration(
+ getIsAutoCommitEnabled(),
+ enableCommitOnCheckpoints,
+ ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
+
+ switch (offsetCommitMode) {
+ case ON_CHECKPOINTS:
+ LOG.info("Consumer subtask {} will commit offsets back to Kafka on completed checkpoints.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ break;
+ case KAFKA_PERIODIC:
+ LOG.info("Consumer subtask {} will commit offsets back to Kafka periodically using the Kafka client's auto commit.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ break;
+ default:
+ case DISABLED:
+ LOG.info("Consumer subtask {} has disabled offset committing back to Kafka." +
+ " This does not compromise Flink's checkpoint integrity.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+
+ // initialize subscribed partitions
List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size());
@@ -377,9 +429,12 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
// create the fetcher that will communicate with the Kafka brokers
final AbstractFetcher<T, ?> fetcher = createFetcher(
- sourceContext, subscribedPartitionsToStartOffsets,
- periodicWatermarkAssigner, punctuatedWatermarkAssigner,
- (StreamingRuntimeContext) getRuntimeContext());
+ sourceContext,
+ subscribedPartitionsToStartOffsets,
+ periodicWatermarkAssigner,
+ punctuatedWatermarkAssigner,
+ (StreamingRuntimeContext) getRuntimeContext(),
+ offsetCommitMode);
// publish the reference, for snapshot-, commit-, and cancel calls
// IMPORTANT: We can only do that now, because only now will calls to
@@ -485,15 +540,19 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
offsetsStateForCheckpoint.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
- // the map cannot be asynchronously updated, because only one checkpoint call can happen
- // on this function at a time: either snapshotState() or notifyCheckpointComplete()
- pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
+ if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+ // the map cannot be asynchronously updated, because only one checkpoint call can happen
+ // on this function at a time: either snapshotState() or notifyCheckpointComplete()
+ pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
+ }
} else {
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
- // the map cannot be asynchronously updated, because only one checkpoint call can happen
- // on this function at a time: either snapshotState() or notifyCheckpointComplete()
- pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+ if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+ // the map cannot be asynchronously updated, because only one checkpoint call can happen
+ // on this function at a time: either snapshotState() or notifyCheckpointComplete()
+ pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+ }
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
offsetsStateForCheckpoint.add(
@@ -501,9 +560,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
}
}
- // truncate the map of pending offsets to commit, to prevent infinite growth
- while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
- pendingOffsetsToCommit.remove(0);
+ if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+ // truncate the map of pending offsets to commit, to prevent infinite growth
+ while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+ pendingOffsetsToCommit.remove(0);
+ }
}
}
}
@@ -533,39 +594,40 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
LOG.debug("notifyCheckpointComplete() called on uninitialized source");
return;
}
-
- // only one commit operation must be in progress
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
- }
- try {
- final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
- if (posInMap == -1) {
- LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
- return;
+ if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+ // only one commit operation must be in progress
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
}
- @SuppressWarnings("unchecked")
- HashMap<KafkaTopicPartition, Long> offsets =
+ try {
+ final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+ if (posInMap == -1) {
+ LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+ return;
+ }
+
+ @SuppressWarnings("unchecked")
+ HashMap<KafkaTopicPartition, Long> offsets =
(HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
- // remove older checkpoints in map
- for (int i = 0; i < posInMap; i++) {
- pendingOffsetsToCommit.remove(0);
- }
+ // remove older checkpoints in map
+ for (int i = 0; i < posInMap; i++) {
+ pendingOffsetsToCommit.remove(0);
+ }
- if (offsets == null || offsets.size() == 0) {
- LOG.debug("Checkpoint state was empty.");
- return;
- }
- fetcher.commitInternalOffsetsToKafka(offsets);
- }
- catch (Exception e) {
- if (running) {
- throw e;
+ if (offsets == null || offsets.size() == 0) {
+ LOG.debug("Checkpoint state was empty.");
+ return;
+ }
+ fetcher.commitInternalOffsetsToKafka(offsets);
+ } catch (Exception e) {
+ if (running) {
+ throw e;
+ }
+ // else ignore exception if we are no longer running
}
- // else ignore exception if we are no longer running
}
}
@@ -592,9 +654,12 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- StreamingRuntimeContext runtimeContext) throws Exception;
+ StreamingRuntimeContext runtimeContext,
+ OffsetCommitMode offsetCommitMode) throws Exception;
protected abstract List<KafkaTopicPartition> getKafkaPartitions(List<String> topics);
+
+ protected abstract boolean getIsAutoCommitEnabled();
// ------------------------------------------------------------------------
// ResultTypeQueryable methods
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
new file mode 100644
index 0000000..8bb75b4
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+/**
+ * The offset commit mode represents the behaviour of how offsets are externally committed
+ * back to Kafka brokers / Zookeeper.
+ *
+ * The exact value of this is determined at runtime in the consumer subtasks.
+ */
+public enum OffsetCommitMode {
+
+ /** Completely disable offset committing. */
+ DISABLED,
+
+ /** Commit offsets back to Kafka only when checkpoints are completed. */
+ ON_CHECKPOINTS,
+
+ /** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
+ KAFKA_PERIODIC;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java
new file mode 100644
index 0000000..9e1d9d5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+/**
+ * Utilities for {@link OffsetCommitMode}.
+ */
+public class OffsetCommitModes {
+
+ /**
+ * Determine the offset commit mode using several configuration values.
+ *
+ * @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
+ * @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
+ * @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
+ *
+ * @return the offset commmit mode to use, based on the configuration values.
+ */
+ public static OffsetCommitMode fromConfiguration(
+ boolean enableAutoCommit,
+ boolean enableCommitOnCheckpoint,
+ boolean enableCheckpointing) {
+
+ if (enableCheckpointing) {
+ // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
+ return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
+ } else {
+ // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
+ return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index b8ac980..0b311a9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.SerializedValue;
@@ -166,11 +167,14 @@ public abstract class AbstractFetcher<T, KPH> {
/**
* Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
- * older Kafka versions). The given offsets are the internal checkpointed offsets, representing
+ * older Kafka versions). This method is only ever called when the offset commit mode of
+ * the consumer is {@link OffsetCommitMode#ON_CHECKPOINTS}.
+ *
+ * The given offsets are the internal checkpointed offsets, representing
* the last processed record of each partition. Version-specific implementations of this method
* need to hold the contract that the given offsets must be incremented by 1 before
* committing them, so that committed offsets to Kafka represent "the next record to process".
- *
+ *
* @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
* @throws Exception This method forwards exceptions.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 20411e1..9fc261e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
@@ -192,7 +193,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- StreamingRuntimeContext runtimeContext) throws Exception {
+ StreamingRuntimeContext runtimeContext,
+ OffsetCommitMode offsetCommitMode) throws Exception {
return mock(AbstractFetcher.class);
}
@@ -200,6 +202,11 @@ public class FlinkKafkaConsumerBaseMigrationTest {
protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
return partitions;
}
+
+ @Override
+ protected boolean getIsAutoCommitEnabled() {
+ return false;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 33f1b85..123c2be 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
@@ -31,6 +30,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -38,6 +38,7 @@ import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
+import org.mockito.Mockito;
import java.io.Serializable;
import java.lang.reflect.Field;
@@ -54,7 +55,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyMap;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class FlinkKafkaConsumerBaseTest {
@@ -187,6 +191,7 @@ public class FlinkKafkaConsumerBaseTest {
/**
* Tests that on snapshots, states and offsets to commit to Kafka are correct
*/
+ @SuppressWarnings("unchecked")
@Test
public void checkUseFetcherWhenNoCheckpoint() throws Exception {
@@ -211,7 +216,7 @@ public class FlinkKafkaConsumerBaseTest {
@Test
@SuppressWarnings("unchecked")
- public void testSnapshotState() throws Exception {
+ public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception {
// --------------------------------------------------------------------
// prepare fake states
@@ -233,10 +238,14 @@ public class FlinkKafkaConsumerBaseTest {
final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
-
+
final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-
+
FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true);
+ StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
+ when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing
+ consumer.setRuntimeContext(mockRuntimeContext);
+
assertEquals(0, pendingOffsetsToCommit.size());
OperatorStateStore backend = mock(OperatorStateStore.class);
@@ -252,6 +261,8 @@ public class FlinkKafkaConsumerBaseTest {
consumer.initializeState(initializationContext);
+ consumer.open(new Configuration());
+
// checkpoint 1
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
@@ -330,12 +341,140 @@ public class FlinkKafkaConsumerBaseTest {
assertEquals(0, pendingOffsetsToCommit.size());
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception {
+ // --------------------------------------------------------------------
+ // prepare fake states
+ // --------------------------------------------------------------------
+
+ final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>();
+ state1.put(new KafkaTopicPartition("abc", 13), 16768L);
+ state1.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+ final HashMap<KafkaTopicPartition, Long> state2 = new HashMap<>();
+ state2.put(new KafkaTopicPartition("abc", 13), 16770L);
+ state2.put(new KafkaTopicPartition("def", 7), 987654329L);
+
+ final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>();
+ state3.put(new KafkaTopicPartition("abc", 13), 16780L);
+ state3.put(new KafkaTopicPartition("def", 7), 987654377L);
+
+ // --------------------------------------------------------------------
+
+ final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+ when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
+
+ final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+ FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true);
+ StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
+ when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing
+ consumer.setRuntimeContext(mockRuntimeContext);
+
+ consumer.setCommitOffsetsOnCheckpoints(false); // disable offset committing
+
+ assertEquals(0, pendingOffsetsToCommit.size());
+
+ OperatorStateStore backend = mock(OperatorStateStore.class);
+
+ TestingListState<Serializable> listState = new TestingListState<>();
+
+ when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+ StateInitializationContext initializationContext = mock(StateInitializationContext.class);
+
+ when(initializationContext.getOperatorStateStore()).thenReturn(backend);
+ when(initializationContext.isRestored()).thenReturn(false, true, true, true);
+
+ consumer.initializeState(initializationContext);
+
+ consumer.open(new Configuration());
+
+ // checkpoint 1
+ consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
+
+ HashMap<KafkaTopicPartition, Long> snapshot1 = new HashMap<>();
+
+ for (Serializable serializable : listState.get()) {
+ Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
+ snapshot1.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
+ }
+
+ assertEquals(state1, snapshot1);
+ assertEquals(0, pendingOffsetsToCommit.size()); // pending offsets to commit should not be updated
+
+ // checkpoint 2
+ consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140, 140));
+
+ HashMap<KafkaTopicPartition, Long> snapshot2 = new HashMap<>();
+
+ for (Serializable serializable : listState.get()) {
+ Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
+ snapshot2.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
+ }
+
+ assertEquals(state2, snapshot2);
+ assertEquals(0, pendingOffsetsToCommit.size()); // pending offsets to commit should not be updated
+
+ // ack checkpoint 1
+ consumer.notifyCheckpointComplete(138L);
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+
+ // checkpoint 3
+ consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
+
+ HashMap<KafkaTopicPartition, Long> snapshot3 = new HashMap<>();
+
+ for (Serializable serializable : listState.get()) {
+ Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
+ snapshot3.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
+ }
+
+ assertEquals(state3, snapshot3);
+ assertEquals(0, pendingOffsetsToCommit.size()); // pending offsets to commit should not be updated
+
+ // ack checkpoint 3, subsumes number 2
+ consumer.notifyCheckpointComplete(141L);
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+
+
+ consumer.notifyCheckpointComplete(666); // invalid checkpoint
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+
+ OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+ listState = new TestingListState<>();
+ when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+
+ // create 500 snapshots
+ for (int i = 100; i < 600; i++) {
+ consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i));
+ listState.clear();
+ }
+ assertEquals(0, pendingOffsetsToCommit.size()); // pending offsets to commit should not be updated
+
+ // commit only the second last
+ consumer.notifyCheckpointComplete(598);
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+
+ // access invalid checkpoint
+ consumer.notifyCheckpointComplete(590);
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+
+ // and the last
+ consumer.notifyCheckpointComplete(599);
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ }
+
// ------------------------------------------------------------------------
private static <T> FlinkKafkaConsumerBase<T> getConsumer(
AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception
{
FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
+ StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
+ Mockito.when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+ consumer.setRuntimeContext(mockRuntimeContext);
Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
fetcherField.setAccessible(true);
@@ -369,7 +508,8 @@ public class FlinkKafkaConsumerBaseTest {
Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- StreamingRuntimeContext runtimeContext) throws Exception {
+ StreamingRuntimeContext runtimeContext,
+ OffsetCommitMode offsetCommitMode) throws Exception {
return mock(AbstractFetcher.class);
}
@@ -379,8 +519,8 @@ public class FlinkKafkaConsumerBaseTest {
}
@Override
- public RuntimeContext getRuntimeContext() {
- return mock(StreamingRuntimeContext.class);
+ protected boolean getIsAutoCommitEnabled() {
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java b/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
index 3d7a7e4..8650dcd 100644
--- a/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
@@ -93,6 +93,24 @@ public class PropertiesUtil {
}
}
+ /**
+ * Get boolean from properties.
+ * This method returns {@code true} iff the parsed value is "true".
+ *
+ * @param config Properties
+ * @param key key in Properties
+ * @param defaultValue default value if value is not set
+ * @return default or value of key
+ */
+ public static boolean getBoolean(Properties config, String key, boolean defaultValue) {
+ String val = config.getProperty(key);
+ if (val == null) {
+ return defaultValue;
+ } else {
+ return Boolean.parseBoolean(val);
+ }
+ }
+
// ------------------------------------------------------------------------
/** Private default constructor to prevent instantiation */