You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/14 17:00:33 UTC

[1/2] flink git commit: [FLINK-5949] [yarn] Don't check Kerberos credentials for non-Kerberos authentication methods

Repository: flink
Updated Branches:
  refs/heads/master dabeb74c1 -> 90c7415b0


[FLINK-5949] [yarn] Don't check Kerberos credentials for non-Kerberos authentication methods

This closes #3528.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87779ad0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87779ad0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87779ad0

Branch: refs/heads/master
Commit: 87779ad034957923e316c1100becff8288e5c9ca
Parents: dabeb74
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Mar 14 13:42:26 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Mar 15 00:30:47 2017 +0800

----------------------------------------------------------------------
 .../org/apache/flink/yarn/AbstractYarnClusterDescriptor.java | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87779ad0/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 483e279..8928f0a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -414,10 +414,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 				// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
 				// so we check only in ticket cache scenario.
 				boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+
 				UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
-				if (useTicketCache && !loginUser.hasKerberosCredentials()) {
-					LOG.error("Hadoop security is enabled but the login user does not have Kerberos credentials");
-					throw new RuntimeException("Hadoop security is enabled but the login user " +
+				if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
+						&& useTicketCache && !loginUser.hasKerberosCredentials()) {
+					LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
+					throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
 							"does not have Kerberos credentials");
 				}
 			}


[2/2] flink git commit: [FLINK-3398] [kafka] Allow disabling offset committing for FlinkKafkaConsumer

Posted by tz...@apache.org.
[FLINK-3398] [kafka] Allow disabling offset committing for FlinkKafkaConsumer

This closes #3527.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90c7415b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90c7415b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90c7415b

Branch: refs/heads/master
Commit: 90c7415b03b0b1ec28fec9dcd68e426321ad7270
Parents: 87779ad
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Mar 13 13:49:08 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Mar 15 00:58:17 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |  28 ++++
 .../connectors/kafka/FlinkKafkaConsumer010.java |   8 +-
 .../kafka/internal/Kafka010Fetcher.java         |   2 -
 .../connectors/kafka/Kafka010FetcherTest.java   |   3 -
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  21 ++-
 .../connectors/kafka/Kafka08ITCase.java         |   3 +
 .../connectors/kafka/KafkaConsumer08Test.java   |  31 +++-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  15 +-
 .../kafka/internal/Kafka09Fetcher.java          |   7 -
 .../connectors/kafka/Kafka09FetcherTest.java    |   3 -
 .../kafka/FlinkKafkaConsumerBase.java           | 149 +++++++++++++-----
 .../kafka/config/OffsetCommitMode.java          |  36 +++++
 .../kafka/config/OffsetCommitModes.java         |  47 ++++++
 .../kafka/internals/AbstractFetcher.java        |   8 +-
 .../FlinkKafkaConsumerBaseMigrationTest.java    |   9 +-
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 154 ++++++++++++++++++-
 .../org/apache/flink/util/PropertiesUtil.java   |  18 +++
 17 files changed, 458 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 6d58b0c..60a8039 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -292,6 +292,34 @@ Flink on YARN supports automatic restart of lost YARN containers.
 
 If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
 
+### Kafka Consumers Offset Committing Behaviour Configuration
+
+The Flink Kafka Consumer allows configuring the behaviour of how offsets
+are committed back to Kafka brokers (or Zookeeper in 0.8). Note that the
+Flink Kafka Consumer does not rely on the committed offsets for fault
+tolerance guarantees. The committed offsets are only a means to expose
+the consumer's progress for monitoring purposes.
+
+The way to configure offset commit behaviour is different, depending on
+whether or not checkpointing is enabled for the job.
+
+ - *Checkpointing disabled:* if checkpointing is disabled, the Flink Kafka
+ Consumer relies on the automatic periodic offset committing capability
+ of the internally used Kafka clients. Therefore, to disable or enable offset
+ committing, simply set the `enable.auto.commit` (or `auto.commit.enable`
+ for Kafka 0.8) / `auto.commit.interval.ms` keys to appropriate values
+ in the provided `Properties` configuration.
+ 
+ - *Checkpointing enabled:* if checkpointing is enabled, the Flink Kafka
+ Consumer will commit the offsets stored in the checkpointed states when
+ the checkpoints are completed. This ensures that the committed offsets
+ in Kafka brokers is consistent with the offsets in the checkpointed states.
+ Users can choose to disable or enable offset committing by calling the
+ `setCommitOffsetsOnCheckpoints(boolean)` method on the consumer (by default,
+ the behaviour is `true`).
+ Note that in this scenario, the automatic periodic offset committing
+ settings in `Properties` is completely ignored.
+
 ### Kafka Consumers and Timestamp Extraction/Watermark Emission
 
 In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself.

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 716fa19..9e06d6e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -20,12 +20,14 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.util.PropertiesUtil;
 import org.apache.flink.util.SerializedValue;
 
 import java.util.Collections;
@@ -131,9 +133,10 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			StreamingRuntimeContext runtimeContext) throws Exception {
+			StreamingRuntimeContext runtimeContext,
+			OffsetCommitMode offsetCommitMode) throws Exception {
 
-		boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
+		boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
 
 		return new Kafka010Fetcher<>(
 				sourceContext,
@@ -143,7 +146,6 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 				runtimeContext.getProcessingTimeService(),
 				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
 				runtimeContext.getUserCodeClassLoader(),
-				runtimeContext.isCheckpointingEnabled(),
 				runtimeContext.getTaskNameWithSubtasks(),
 				runtimeContext.getMetricGroup(),
 				deserializer,

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index da6ecd0..586d841 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -52,7 +52,6 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 			ProcessingTimeService processingTimeProvider,
 			long autoWatermarkInterval,
 			ClassLoader userCodeClassLoader,
-			boolean enableCheckpointing,
 			String taskNameWithSubtasks,
 			MetricGroup metricGroup,
 			KeyedDeserializationSchema<T> deserializer,
@@ -68,7 +67,6 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 				processingTimeProvider,
 				autoWatermarkInterval,
 				userCodeClassLoader,
-				enableCheckpointing,
 				taskNameWithSubtasks,
 				metricGroup,
 				deserializer,

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 17ba712..2d0551d 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -130,7 +130,6 @@ public class Kafka010FetcherTest {
 				new TestProcessingTimeService(),
 				10,
 				getClass().getClassLoader(),
-				false, /* checkpointing */
 				"taskname-with-subtask",
 				new UnregisteredMetricsGroup(),
 				schema,
@@ -267,7 +266,6 @@ public class Kafka010FetcherTest {
 				new TestProcessingTimeService(),
 				10,
 				getClass().getClassLoader(),
-				false, /* checkpointing */
 				"taskname-with-subtask",
 				new UnregisteredMetricsGroup(),
 				schema,
@@ -382,7 +380,6 @@ public class Kafka010FetcherTest {
 				new TestProcessingTimeService(),
 				10, /* watermark interval */
 				this.getClass().getClassLoader(),
-				true, /* checkpointing */
 				"task_name",
 				new UnregisteredMetricsGroup(),
 				schema,

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index bf7ed02..858a790 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -27,6 +27,7 @@ import kafka.javaapi.consumer.SimpleConsumer;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -112,9 +113,6 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	/** The properties to parametrize the Kafka consumer and ZooKeeper client */ 
 	private final Properties kafkaProperties;
 
-	/** The interval in which to automatically commit (-1 if deactivated) */
-	private final long autoCommitInterval;
-
 	// ------------------------------------------------------------------------
 
 	/**
@@ -187,8 +185,6 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 
 		// eagerly check for invalid "auto.offset.reset" values before launching the job
 		validateAutoOffsetResetValue(props);
-
-		this.autoCommitInterval = PropertiesUtil.getLong(props, "auto.commit.interval.ms", 60000);
 	}
 
 	@Override
@@ -197,9 +193,14 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			StreamingRuntimeContext runtimeContext) throws Exception {
+			StreamingRuntimeContext runtimeContext,
+			OffsetCommitMode offsetCommitMode) throws Exception {
+
+		boolean useMetrics = !PropertiesUtil.getBoolean(kafkaProperties, KEY_DISABLE_METRICS, false);
 
-		boolean useMetrics = !Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false"));
+		long autoCommitInterval = (offsetCommitMode == OffsetCommitMode.KAFKA_PERIODIC)
+				? PropertiesUtil.getLong(kafkaProperties, "auto.commit.interval.ms", 60000)
+				: -1; // this disables the periodic offset committer thread in the fetcher
 
 		return new Kafka08Fetcher<>(
 				sourceContext,
@@ -232,6 +233,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 		return partitionInfos;
 	}
 
+	@Override
+	protected boolean getIsAutoCommitEnabled() {
+		return PropertiesUtil.getBoolean(kafkaProperties, "auto.commit.enable", true) &&
+				PropertiesUtil.getLong(kafkaProperties, "auto.commit.interval.ms", 60000) > 0;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Kafka / ZooKeeper communication utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 2e7c368..f5cb8c0 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -202,6 +202,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		// at least once.
 		Properties readProps = new Properties();
 		readProps.putAll(standardProps);
+
+		// make sure that auto commit is enabled in the properties
+		readProps.setProperty("auto.commit.enable", "true");
 		readProps.setProperty("auto.commit.interval.ms", "500");
 
 		// read so that the offset can be committed to ZK

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
index 5ae74d7..83cdd90 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -21,21 +21,36 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 
 import java.net.InetAddress;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.SerializedValue;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Matchers;
+import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -96,7 +111,12 @@ public class KafkaConsumer08Test {
 			props.setProperty("group.id", "non-existent-group");
 			props.setProperty(FlinkKafkaConsumer08.GET_PARTITIONS_RETRIES_KEY, "1");
 
-			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
+			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
+				Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
+			StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
+			Mockito.when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+			consumer.setRuntimeContext(mockRuntimeContext);
+
 			consumer.open(new Configuration());
 			fail();
 		}
@@ -118,8 +138,13 @@ public class KafkaConsumer08Test {
 			String zookeeperConnect = "localhost:56794";
 			String groupId = "non-existent-group";
 			Properties props = createKafkaProps(zookeeperConnect, unknownHost, groupId);
-			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
-					new SimpleStringSchema(), props);
+
+			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
+				Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
+			StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
+			Mockito.when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+			consumer.setRuntimeContext(mockRuntimeContext);
+
 			consumer.open(new Configuration());
 			fail();
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index c7236a2..d0284ce 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -20,12 +20,14 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.util.PropertiesUtil;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -174,9 +176,10 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 			Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			StreamingRuntimeContext runtimeContext) throws Exception {
+			StreamingRuntimeContext runtimeContext,
+			OffsetCommitMode offsetCommitMode) throws Exception {
 
-		boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
+		boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false);
 
 		return new Kafka09Fetcher<>(
 				sourceContext,
@@ -186,14 +189,12 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 				runtimeContext.getProcessingTimeService(),
 				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
 				runtimeContext.getUserCodeClassLoader(),
-				runtimeContext.isCheckpointingEnabled(),
 				runtimeContext.getTaskNameWithSubtasks(),
 				runtimeContext.getMetricGroup(),
 				deserializer,
 				properties,
 				pollTimeout,
 				useMetrics);
-		
 	}
 
 	@Override
@@ -229,6 +230,12 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 		return partitions;
 	}
 
+	@Override
+	protected boolean getIsAutoCommitEnabled() {
+		return PropertiesUtil.getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
+				PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index c389486..1c87542 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -76,7 +75,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 			ProcessingTimeService processingTimeProvider,
 			long autoWatermarkInterval,
 			ClassLoader userCodeClassLoader,
-			boolean enableCheckpointing,
 			String taskNameWithSubtasks,
 			MetricGroup metricGroup,
 			KeyedDeserializationSchema<T> deserializer,
@@ -99,11 +97,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 
 		final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
 		addOffsetStateGauge(kafkaMetricGroup);
-
-		// if checkpointing is enabled, we are not automatically committing to Kafka.
-		kafkaProperties.setProperty(
-				ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-				Boolean.toString(!enableCheckpointing));
 		
 		this.consumerThread = new KafkaConsumerThread(
 				LOG,

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 49144e6..6e13db2 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -130,7 +130,6 @@ public class Kafka09FetcherTest {
 				new TestProcessingTimeService(),
 				10, /* watermark interval */
 				this.getClass().getClassLoader(),
-				true, /* checkpointing */
 				"task_name",
 				new UnregisteredMetricsGroup(),
 				schema,
@@ -267,7 +266,6 @@ public class Kafka09FetcherTest {
 				new TestProcessingTimeService(),
 				10, /* watermark interval */
 				this.getClass().getClassLoader(),
-				true, /* checkpointing */
 				"task_name",
 				new UnregisteredMetricsGroup(),
 				schema,
@@ -382,7 +380,6 @@ public class Kafka09FetcherTest {
 				new TestProcessingTimeService(),
 				10, /* watermark interval */
 				this.getClass().getClassLoader(),
-				true, /* checkpointing */
 				"task_name",
 				new UnregisteredMetricsGroup(),
 				schema,

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 027751c..6858509 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -37,6 +37,8 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -103,8 +105,21 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 	private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint;
 
+	/**
+	 * User-set flag determining whether or not to commit on checkpoints.
+	 * Note: this flag does not represent the final offset commit mode.
+	 */
+	private boolean enableCommitOnCheckpoints = true;
+
+	/**
+	 * The offset commit mode for the consumer.
+	 * The value of this can only be determined in {@link FlinkKafkaConsumerBase#open(Configuration)} since it depends
+	 * on whether or not checkpointing is enabled for the job.
+	 */
+	private OffsetCommitMode offsetCommitMode;
+
 	/** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}) */
-	protected StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+	private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
 
 	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS} */
 	protected Map<KafkaTopicPartition, Long> specificStartupOffsets;
@@ -161,7 +176,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
 	 * 
 	 * @param assigner The timestamp assigner / watermark generator to use.
-	 * @return The consumer object, to allow function chaining.   
+	 * @return The consumer object, to allow function chaining.
 	 */
 	public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
 		checkNotNull(assigner);
@@ -196,7 +211,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	 * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
 	 *
 	 * @param assigner The timestamp assigner / watermark generator to use.
-	 * @return The consumer object, to allow function chaining.   
+	 * @return The consumer object, to allow function chaining.
 	 */
 	public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
 		checkNotNull(assigner);
@@ -214,6 +229,20 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	/**
+	 * Specifies whether or not the consumer should commit offsets back to Kafka on checkpoints.
+	 *
+	 * This setting will only have effect if checkpointing is enabled for the job.
+	 * If checkpointing isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+)
+	 * property settings will be
+	 *
+	 * @return The consumer object, to allow function chaining.
+	 */
+	public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
+		this.enableCommitOnCheckpoints = commitOnCheckpoints;
+		return this;
+	}
+
+	/**
 	 * Specifies the consumer to start reading from the earliest offset for all partitions.
 	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
 	 *
@@ -295,6 +324,29 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 	@Override
 	public void open(Configuration configuration) {
+		// determine the offset commit mode
+		offsetCommitMode = OffsetCommitModes.fromConfiguration(
+				getIsAutoCommitEnabled(),
+				enableCommitOnCheckpoints,
+				((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
+
+		switch (offsetCommitMode) {
+			case ON_CHECKPOINTS:
+				LOG.info("Consumer subtask {} will commit offsets back to Kafka on completed checkpoints.",
+						getRuntimeContext().getIndexOfThisSubtask());
+				break;
+			case KAFKA_PERIODIC:
+				LOG.info("Consumer subtask {} will commit offsets back to Kafka periodically using the Kafka client's auto commit.",
+						getRuntimeContext().getIndexOfThisSubtask());
+				break;
+			default:
+			case DISABLED:
+				LOG.info("Consumer subtask {} has disabled offset committing back to Kafka." +
+						" This does not compromise Flink's checkpoint integrity.",
+						getRuntimeContext().getIndexOfThisSubtask());
+		}
+
+		// initialize subscribed partitions
 		List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
 
 		subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size());
@@ -377,9 +429,12 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 			// create the fetcher that will communicate with the Kafka brokers
 			final AbstractFetcher<T, ?> fetcher = createFetcher(
-					sourceContext, subscribedPartitionsToStartOffsets,
-					periodicWatermarkAssigner, punctuatedWatermarkAssigner,
-					(StreamingRuntimeContext) getRuntimeContext());
+					sourceContext,
+					subscribedPartitionsToStartOffsets,
+					periodicWatermarkAssigner,
+					punctuatedWatermarkAssigner,
+					(StreamingRuntimeContext) getRuntimeContext(),
+					offsetCommitMode);
 
 			// publish the reference, for snapshot-, commit-, and cancel calls
 			// IMPORTANT: We can only do that now, because only now will calls to
@@ -485,15 +540,19 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 					offsetsStateForCheckpoint.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
 				}
 
-				// the map cannot be asynchronously updated, because only one checkpoint call can happen
-				// on this function at a time: either snapshotState() or notifyCheckpointComplete()
-				pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
+				if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+					// the map cannot be asynchronously updated, because only one checkpoint call can happen
+					// on this function at a time: either snapshotState() or notifyCheckpointComplete()
+					pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
+				}
 			} else {
 				HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
 
-				// the map cannot be asynchronously updated, because only one checkpoint call can happen
-				// on this function at a time: either snapshotState() or notifyCheckpointComplete()
-				pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+				if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+					// the map cannot be asynchronously updated, because only one checkpoint call can happen
+					// on this function at a time: either snapshotState() or notifyCheckpointComplete()
+					pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+				}
 
 				for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
 					offsetsStateForCheckpoint.add(
@@ -501,9 +560,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				}
 			}
 
-			// truncate the map of pending offsets to commit, to prevent infinite growth
-			while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
-				pendingOffsetsToCommit.remove(0);
+			if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+				// truncate the map of pending offsets to commit, to prevent infinite growth
+				while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+					pendingOffsetsToCommit.remove(0);
+				}
 			}
 		}
 	}
@@ -533,39 +594,40 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			LOG.debug("notifyCheckpointComplete() called on uninitialized source");
 			return;
 		}
-		
-		// only one commit operation must be in progress
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
-		}
 
-		try {
-			final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
-			if (posInMap == -1) {
-				LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
-				return;
+		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+			// only one commit operation must be in progress
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
 			}
 
-			@SuppressWarnings("unchecked")
-			HashMap<KafkaTopicPartition, Long> offsets =
+			try {
+				final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+				if (posInMap == -1) {
+					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+					return;
+				}
+
+				@SuppressWarnings("unchecked")
+				HashMap<KafkaTopicPartition, Long> offsets =
 					(HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
 
-			// remove older checkpoints in map
-			for (int i = 0; i < posInMap; i++) {
-				pendingOffsetsToCommit.remove(0);
-			}
+				// remove older checkpoints in map
+				for (int i = 0; i < posInMap; i++) {
+					pendingOffsetsToCommit.remove(0);
+				}
 
-			if (offsets == null || offsets.size() == 0) {
-				LOG.debug("Checkpoint state was empty.");
-				return;
-			}
-			fetcher.commitInternalOffsetsToKafka(offsets);
-		}
-		catch (Exception e) {
-			if (running) {
-				throw e;
+				if (offsets == null || offsets.size() == 0) {
+					LOG.debug("Checkpoint state was empty.");
+					return;
+				}
+				fetcher.commitInternalOffsetsToKafka(offsets);
+			} catch (Exception e) {
+				if (running) {
+					throw e;
+				}
+				// else ignore exception if we are no longer running
 			}
-			// else ignore exception if we are no longer running
 		}
 	}
 
@@ -592,9 +654,12 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			StreamingRuntimeContext runtimeContext) throws Exception;
+			StreamingRuntimeContext runtimeContext,
+			OffsetCommitMode offsetCommitMode) throws Exception;
 
 	protected abstract List<KafkaTopicPartition> getKafkaPartitions(List<String> topics);
+
+	protected abstract boolean getIsAutoCommitEnabled();
 	
 	// ------------------------------------------------------------------------
 	//  ResultTypeQueryable methods 

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
new file mode 100644
index 0000000..8bb75b4
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+/**
+ * The offset commit mode represents the behaviour of how offsets are externally committed
+ * back to Kafka brokers / Zookeeper.
+ *
+ * The exact value of this is determined at runtime in the consumer subtasks.
+ */
+public enum OffsetCommitMode {
+
+	/** Completely disable offset committing. */
+	DISABLED,
+
+	/** Commit offsets back to Kafka only when checkpoints are completed. */
+	ON_CHECKPOINTS,
+
+	/** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
+	KAFKA_PERIODIC;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java
new file mode 100644
index 0000000..9e1d9d5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+/**
+ * Utilities for {@link OffsetCommitMode}.
+ */
+public class OffsetCommitModes {
+
+	/**
+	 * Determine the offset commit mode using several configuration values.
+	 *
+	 * @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
+	 * @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
+	 * @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
+	 *
+	 * @return the offset commmit mode to use, based on the configuration values.
+	 */
+	public static OffsetCommitMode fromConfiguration(
+			boolean enableAutoCommit,
+			boolean enableCommitOnCheckpoint,
+			boolean enableCheckpointing) {
+
+		if (enableCheckpointing) {
+			// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
+			return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
+		} else {
+			// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
+			return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index b8ac980..0b311a9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
@@ -166,11 +167,14 @@ public abstract class AbstractFetcher<T, KPH> {
 
 	/**
 	 * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
-	 * older Kafka versions). The given offsets are the internal checkpointed offsets, representing
+	 * older Kafka versions). This method is only ever called when the offset commit mode of
+	 * the consumer is {@link OffsetCommitMode#ON_CHECKPOINTS}.
+	 *
+	 * The given offsets are the internal checkpointed offsets, representing
 	 * the last processed record of each partition. Version-specific implementations of this method
 	 * need to hold the contract that the given offsets must be incremented by 1 before
 	 * committing them, so that committed offsets to Kafka represent "the next record to process".
-	 * 
+	 *
 	 * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
 	 * @throws Exception This method forwards exceptions.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 20411e1..9fc261e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
@@ -192,7 +193,8 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 				Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
 				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-				StreamingRuntimeContext runtimeContext) throws Exception {
+				StreamingRuntimeContext runtimeContext,
+				OffsetCommitMode offsetCommitMode) throws Exception {
 			return mock(AbstractFetcher.class);
 		}
 
@@ -200,6 +202,11 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
 			return partitions;
 		}
+
+		@Override
+		protected boolean getIsAutoCommitEnabled() {
+			return false;
+		}
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 33f1b85..123c2be 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -31,6 +30,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -38,6 +38,7 @@ import org.apache.flink.util.SerializedValue;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Matchers;
+import org.mockito.Mockito;
 
 import java.io.Serializable;
 import java.lang.reflect.Field;
@@ -54,7 +55,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyMap;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class FlinkKafkaConsumerBaseTest {
@@ -187,6 +191,7 @@ public class FlinkKafkaConsumerBaseTest {
 	/**
 	 * Tests that on snapshots, states and offsets to commit to Kafka are correct
 	 */
+	@SuppressWarnings("unchecked")
 	@Test
 	public void checkUseFetcherWhenNoCheckpoint() throws Exception {
 
@@ -211,7 +216,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testSnapshotState() throws Exception {
+	public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception {
 
 		// --------------------------------------------------------------------
 		//   prepare fake states
@@ -233,10 +238,14 @@ public class FlinkKafkaConsumerBaseTest {
 		
 		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
 		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
-			
+
 		final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-	
+
 		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true);
+		StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
+		when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing
+		consumer.setRuntimeContext(mockRuntimeContext);
+
 		assertEquals(0, pendingOffsetsToCommit.size());
 
 		OperatorStateStore backend = mock(OperatorStateStore.class);
@@ -252,6 +261,8 @@ public class FlinkKafkaConsumerBaseTest {
 
 		consumer.initializeState(initializationContext);
 
+		consumer.open(new Configuration());
+
 		// checkpoint 1
 		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
 
@@ -330,12 +341,140 @@ public class FlinkKafkaConsumerBaseTest {
 		assertEquals(0, pendingOffsetsToCommit.size());
 	}
 
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception {
+		// --------------------------------------------------------------------
+		//   prepare fake states
+		// --------------------------------------------------------------------
+
+		final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>();
+		state1.put(new KafkaTopicPartition("abc", 13), 16768L);
+		state1.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+		final HashMap<KafkaTopicPartition, Long> state2 = new HashMap<>();
+		state2.put(new KafkaTopicPartition("abc", 13), 16770L);
+		state2.put(new KafkaTopicPartition("def", 7), 987654329L);
+
+		final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>();
+		state3.put(new KafkaTopicPartition("abc", 13), 16780L);
+		state3.put(new KafkaTopicPartition("def", 7), 987654377L);
+
+		// --------------------------------------------------------------------
+
+		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
+
+		final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true);
+		StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
+		when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing
+		consumer.setRuntimeContext(mockRuntimeContext);
+
+		consumer.setCommitOffsetsOnCheckpoints(false); // disable offset committing
+
+		assertEquals(0, pendingOffsetsToCommit.size());
+
+		OperatorStateStore backend = mock(OperatorStateStore.class);
+
+		TestingListState<Serializable> listState = new TestingListState<>();
+
+		when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
+
+		when(initializationContext.getOperatorStateStore()).thenReturn(backend);
+		when(initializationContext.isRestored()).thenReturn(false, true, true, true);
+
+		consumer.initializeState(initializationContext);
+
+		consumer.open(new Configuration());
+
+		// checkpoint 1
+		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
+
+		HashMap<KafkaTopicPartition, Long> snapshot1 = new HashMap<>();
+
+		for (Serializable serializable : listState.get()) {
+			Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
+			snapshot1.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
+		}
+
+		assertEquals(state1, snapshot1);
+		assertEquals(0, pendingOffsetsToCommit.size()); // pending offsets to commit should not be updated
+
+		// checkpoint 2
+		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140, 140));
+
+		HashMap<KafkaTopicPartition, Long> snapshot2 = new HashMap<>();
+
+		for (Serializable serializable : listState.get()) {
+			Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
+			snapshot2.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
+		}
+
+		assertEquals(state2, snapshot2);
+		assertEquals(0, pendingOffsetsToCommit.size()); // pending offsets to commit should not be updated
+
+		// ack checkpoint 1
+		consumer.notifyCheckpointComplete(138L);
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+
+		// checkpoint 3
+		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
+
+		HashMap<KafkaTopicPartition, Long> snapshot3 = new HashMap<>();
+
+		for (Serializable serializable : listState.get()) {
+			Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
+			snapshot3.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
+		}
+
+		assertEquals(state3, snapshot3);
+		assertEquals(0, pendingOffsetsToCommit.size()); // pending offsets to commit should not be updated
+
+		// ack checkpoint 3, subsumes number 2
+		consumer.notifyCheckpointComplete(141L);
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+
+
+		consumer.notifyCheckpointComplete(666); // invalid checkpoint
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+
+		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
+		listState = new TestingListState<>();
+		when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+
+		// create 500 snapshots
+		for (int i = 100; i < 600; i++) {
+			consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i));
+			listState.clear();
+		}
+		assertEquals(0, pendingOffsetsToCommit.size()); // pending offsets to commit should not be updated
+
+		// commit only the second last
+		consumer.notifyCheckpointComplete(598);
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+
+		// access invalid checkpoint
+		consumer.notifyCheckpointComplete(590);
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+
+		// and the last
+		consumer.notifyCheckpointComplete(599);
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static <T> FlinkKafkaConsumerBase<T> getConsumer(
 			AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception
 	{
 		FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
+		StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
+		Mockito.when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
+		consumer.setRuntimeContext(mockRuntimeContext);
 
 		Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
 		fetcherField.setAccessible(true);
@@ -369,7 +508,8 @@ public class FlinkKafkaConsumerBaseTest {
 				Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets,
 				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-				StreamingRuntimeContext runtimeContext) throws Exception {
+				StreamingRuntimeContext runtimeContext,
+				OffsetCommitMode offsetCommitMode) throws Exception {
 			return mock(AbstractFetcher.class);
 		}
 
@@ -379,8 +519,8 @@ public class FlinkKafkaConsumerBaseTest {
 		}
 
 		@Override
-		public RuntimeContext getRuntimeContext() {
-			return mock(StreamingRuntimeContext.class);
+		protected boolean getIsAutoCommitEnabled() {
+			return false;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/90c7415b/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java b/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
index 3d7a7e4..8650dcd 100644
--- a/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
@@ -93,6 +93,24 @@ public class PropertiesUtil {
 		}
 	}
 
+	/**
+	 * Get boolean from properties.
+	 * This method returns {@code true} iff the parsed value is "true".
+	 *
+	 * @param config Properties
+	 * @param key key in Properties
+	 * @param defaultValue default value if value is not set
+	 * @return default or value of key
+	 */
+	public static boolean getBoolean(Properties config, String key, boolean defaultValue) {
+		String val = config.getProperty(key);
+		if (val == null) {
+			return defaultValue;
+		} else {
+			return Boolean.parseBoolean(val);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	
 	/** Private default constructor to prevent instantiation */