You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/20 15:35:00 UTC

flink git commit: [FLINK-2974] Add periodic offset committer for Kafka when checkpointing is disabled

Repository: flink
Updated Branches:
  refs/heads/master 8dc70f2e7 -> 5864e4fd4


[FLINK-2974] Add periodic offset committer for Kafka when checkpointing is disabled

This closes #1341


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5864e4fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5864e4fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5864e4fd

Branch: refs/heads/master
Commit: 5864e4fd4368f1af6e1bc623c91b8e719693471d
Parents: 8dc70f2
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Nov 6 17:06:02 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Nov 20 15:34:42 2015 +0100

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |   1 +
 docs/setup/yarn_setup.md                        |   2 +-
 .../util/AbstractRuntimeUDFContext.java         |   1 +
 .../connectors/kafka/FlinkKafkaConsumer.java    | 111 +++++++++++++++++--
 .../connectors/kafka/internals/Fetcher.java     |  12 +-
 .../kafka/internals/LegacyFetcher.java          |   9 +-
 .../connectors/kafka/KafkaConsumerTestBase.java |  74 +++++++++----
 .../streaming/connectors/kafka/KafkaITCase.java |   8 +-
 .../kafka/testutils/MockRuntimeContext.java     |  24 +++-
 .../api/operators/StreamingRuntimeContext.java  |  33 ++++++
 10 files changed, 234 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index db1f3a7..626df10 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -3499,6 +3499,7 @@ Also note that Flink can only restart the topology if enough processing slots ar
 So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
 
 #### Kafka Producer
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index 7f6950c..12f1986 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -60,7 +60,7 @@ Apache [Hadoop YARN](http://hadoop.apache.org/) is a cluster resource management
 - at least Apache Hadoop 2.2
 - HDFS (Hadoop Distributed File System) (or another distributed file system supported by Hadoop)
 
-If you have troubles using the Flink YARN client, have a look in the [FAQ section]({{ site.baseurl }}/faq.html).
+If you have troubles using the Flink YARN client, have a look in the [FAQ section](http://flink.apache.org/faq.html#yarn-deployment).
 
 ### Start Flink Session
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index be8ac9d..34ec0b5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -175,4 +175,5 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 		throw new UnsupportedOperationException(
 				"This state is only accessible by functions executed on a KeyedStream");
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index c7ae1cc..e701639 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
@@ -233,7 +234,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	private transient long[] restoreToOffset;
 	
 	private volatile boolean running = true;
-	
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -258,8 +259,8 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 		this.offsetStore = checkNotNull(offsetStore);
 		this.fetcherType = checkNotNull(fetcherType);
 
-		if(fetcherType == FetcherType.NEW_HIGH_LEVEL) {
-			throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 is not yet " +
+		if (fetcherType == FetcherType.NEW_HIGH_LEVEL) {
+			throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 / 0.9.0 is not yet " +
 					"supported in Flink");
 		}
 		if (offsetStore == OffsetStore.KAFKA && fetcherType == FetcherType.LEGACY_LOW_LEVEL) {
@@ -290,9 +291,6 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 			}
 		}
 		LOG.info("Topic {} has {} partitions", topic, partitions.length);
-
-		// make sure that we take care of the committing
-		props.setProperty("enable.auto.commit", "false");
 	}
 
 	// ------------------------------------------------------------------------
@@ -302,7 +300,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
-		
+
 		final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
 		final int thisComsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
 		
@@ -374,12 +372,33 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 			// no restore request. Let the offset handler take care of the initial offset seeking
 			offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
 		}
+
+
 	}
 
 	@Override
 	public void run(SourceContext<T> sourceContext) throws Exception {
 		if (fetcher != null) {
+			// For non-checkpointed sources, a thread which periodically commits the current offset into ZK.
+			PeriodicOffsetCommitter offsetCommitter = null;
+
+			// check whether we need to start the periodic checkpoint committer
+			StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
+			if (!streamingRuntimeContext.isCheckpointingEnabled()) {
+				// we use Kafka's own configuration parameter key for this.
+				// Note that the default configuration value in Kafka is 60 * 1000, so we use the
+				// same here.
+				long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
+				offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this);
+				offsetCommitter.start();
+				LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval);
+			}
+
 			fetcher.run(sourceContext, valueDeserializer, lastOffsets);
+
+			if (offsetCommitter != null) {
+				offsetCommitter.close();
+			}
 		}
 		else {
 			// this source never completes
@@ -419,7 +438,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 				LOG.warn("Error while closing Kafka connector data fetcher", e);
 			}
 		}
-		
+
 		OffsetHandler offsetHandler = this.offsetHandler;
 		this.offsetHandler = null;
 		if (offsetHandler != null) {
@@ -430,6 +449,8 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 				LOG.warn("Error while closing Kafka connector offset handler", e);
 			}
 		}
+
+
 	}
 
 	@Override
@@ -567,6 +588,69 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 		}
 		return partitionsToSub;
 	}
+
+	/**
+	 * Thread to periodically commit the current read offset into Zookeeper.
+	 */
+	private static class PeriodicOffsetCommitter extends Thread {
+		private final long commitInterval;
+		private final FlinkKafkaConsumer consumer;
+		private volatile boolean running = true;
+
+		public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
+			this.commitInterval = commitInterval;
+			this.consumer = consumer;
+		}
+
+		@Override
+		public void run() {
+			try {
+				while (running) {
+					try {
+						Thread.sleep(commitInterval);
+						//  ------------  commit current offsets ----------------
+
+						// create copy of current offsets
+						long[] currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
+
+						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
+						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
+							int partition = tp.partition();
+							long offset = currentOffsets[partition];
+							long lastCommitted = consumer.commitedOffsets[partition];
+
+							if (offset != OFFSET_NOT_SET) {
+								if (offset > lastCommitted) {
+									offsetsToCommit.put(tp, offset);
+									LOG.debug("Committing offset {} for partition {}", offset, partition);
+								} else {
+									LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
+								}
+							}
+						}
+
+						consumer.offsetHandler.commit(offsetsToCommit);
+					} catch (InterruptedException e) {
+						if (running) {
+							// throw unexpected interruption
+							throw e;
+						}
+						// looks like the thread is being closed. Leave loop
+						break;
+					}
+				}
+			} catch (Throwable t) {
+				LOG.warn("Periodic checkpoint committer is stopping the fetcher because of an error", t);
+				consumer.fetcher.stopWithError(t);
+			}
+		}
+
+		public void close() {
+			this.running = false;
+			this.interrupt();
+		}
+
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Kafka / ZooKeeper communication utilities
@@ -657,10 +741,19 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 		return partitions;
 	}
 
+	/**
+	 * Turn a broker instance into a node instance
+	 * @param broker broker instance
+	 * @return Node representing the given broker
+	 */
 	private static Node brokerToNode(Broker broker) {
 		return new Node(broker.id(), broker.host(), broker.port());
 	}
-	
+
+	/**
+	 * Validate the ZK configuration, checking for required parameters
+	 * @param props Properties to check
+	 */
 	protected static void validateZooKeeperConfig(Properties props) {
 		if (props.getProperty("zookeeper.connect") == null) {
 			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");

http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
index 4345926..063d089 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
@@ -64,12 +64,11 @@ public interface Fetcher {
 	 * 
 	 * @param sourceContext The source context to emit elements to.
 	 * @param valueDeserializer The deserializer to decode the raw values with.
-	 * @param lastOffsets The array into which to store the offsets foe which elements are emitted. 
+	 * @param lastOffsets The array into which to store the offsets for which elements are emitted (operator state)
 	 * 
 	 * @param <T> The type of elements produced by the fetcher and emitted to the source context.
 	 */
-	<T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, 
-					long[] lastOffsets) throws Exception;
+	<T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, long[] lastOffsets) throws Exception;
 	
 	/**
 	 * Set the next offset to read from for the given partition.
@@ -80,4 +79,11 @@ public interface Fetcher {
 	 * @param offsetToRead To offset to seek to.
 	 */
 	void seek(TopicPartition topicPartition, long offsetToRead);
+
+	/**
+	 * Exit run loop with given error and release all resources.
+	 *
+	 * @param t Error cause
+	 */
+	void stopWithError(Throwable t);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index c4ba103..212ba7d 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -122,7 +122,7 @@ public class LegacyFetcher implements Fetcher {
 	}
 
 	@Override
-	public <T> void run(SourceFunction.SourceContext<T> sourceContext, 
+	public <T> void run(SourceFunction.SourceContext<T> sourceContext,
 						DeserializationSchema<T> valueDeserializer,
 						long[] lastOffsets) throws Exception {
 		
@@ -258,7 +258,8 @@ public class LegacyFetcher implements Fetcher {
 	 * 
 	 * @param error The error to report.
 	 */
-	void onErrorInFetchThread(Throwable error) {
+	@Override
+	public void stopWithError(Throwable error) {
 		if (this.error.compareAndSet(null, error)) {
 			// we are the first to report an error
 			if (mainThread != null) {
@@ -445,7 +446,7 @@ public class LegacyFetcher implements Fetcher {
 								final T value = valueDeserializer.deserialize(valueByte);
 								final long offset = msg.offset();
 										
-								synchronized (sourceContext.getCheckpointLock()) {
+								synchronized (this.sourceContext.getCheckpointLock()) {
 									sourceContext.collect(value);
 									offsetsState[partition] = offset;
 								}
@@ -464,7 +465,7 @@ public class LegacyFetcher implements Fetcher {
 			}
 			catch (Throwable t) {
 				// report to the main thread
-				owner.onErrorInFetchThread(t);
+				owner.stopWithError(t);
 			}
 			finally {
 				// end of run loop. close connection to consumer

http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ffb6818..3e8154f 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -181,12 +181,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	/**
 	 * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper.
 	 *
-	 * This test is only applicable if Teh Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
+	 * This test is only applicable if the Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
 	 */
 	public void runOffsetInZookeeperValidationTest() throws Exception {
-		LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
-
-		final String topicName = "testOffsetHacking";
+		final String topicName = "testOffsetInZK";
 		final int parallelism = 3;
 
 		createTestTopic(topicName, parallelism, 1);
@@ -223,8 +221,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
 
 		assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-		assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-		assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+		assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o2 >= 0 && o2 <= 100));
+		assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o3 >= 0 && o3 <= 100));
 
 		LOG.info("Manipulating offsets");
 
@@ -239,8 +237,56 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		readSequence(env3, standardProps, parallelism, topicName, 50, 50);
 
 		deleteTestTopic(topicName);
+	}
+
+	public void runOffsetAutocommitTest() throws Exception {
+		final String topicName = "testOffsetAutocommit";
+		final int parallelism = 3;
+
+		createTestTopic(topicName, parallelism, 1);
+
+		StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env1.getConfig().disableSysoutLogging();
+		env1.setNumberOfExecutionRetries(0);
+		env1.setParallelism(parallelism);
+
+		StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		// NOTE: We are not enabling the checkpointing!
+		env2.getConfig().disableSysoutLogging();
+		env2.setNumberOfExecutionRetries(0);
+		env2.setParallelism(parallelism);
+
+
+		// write a sequence from 0 to 99 to each of the 3 partitions.
+		writeSequence(env1, topicName, 100, parallelism);
+
+
+		// the readSequence operation sleeps for 20 ms between each record.
+		// setting a delay of 25*20 = 500 for the commit interval makes
+		// sure that we commit roughly 3-4 times while reading, however
+		// at least once.
+		Properties readProps = new Properties();
+		readProps.putAll(standardProps);
+		readProps.setProperty("auto.commit.interval.ms", "500");
+
+		// read so that the offset can be committed to ZK
+		readSequence(env2, readProps, parallelism, topicName, 100, 0);
+
+		// get the offset
+		ZkClient zkClient = createZookeeperClient();
+
+		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
+		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
+		long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
 
-		LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
+		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
+
+		// ensure that the offset has been committed
+		assertTrue("Offset of o1=" + o1 + " was not in range", o1 > 0 && o1 <= 100);
+		assertTrue("Offset of o2=" + o2 + " was not in range", o2 > 0 && o2 <= 100);
+		assertTrue("Offset of o3=" + o3 + " was not in range", o3 > 0 && o3 <= 100);
+
+		deleteTestTopic(topicName);
 	}
 
 	/**
@@ -257,8 +303,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 */
 	@RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
 	public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
-		LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
-
 		final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
 		final int parallelism = 3;
 		final int elementsPerPartition = 100;
@@ -361,8 +405,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			throw e;
 		}
 
-		LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
-
 		deleteTestTopic(topic);
 	}
 
@@ -371,7 +413,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * Flink sources.
 	 */
 	public void runOneToOneExactlyOnceTest() throws Exception {
-		LOG.info("Starting runOneToOneExactlyOnceTest()");
 
 		final String topic = "oneToOneTopic";
 		final int parallelism = 5;
@@ -416,8 +457,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * one Flink source will read multiple Kafka partitions.
 	 */
 	public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
-		LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
-
 		final String topic = "oneToManyTopic";
 		final int numPartitions = 5;
 		final int numElementsPerPartition = 1000;
@@ -463,8 +502,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * that some Flink sources will read no partitions.
 	 */
 	public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
-		LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
-
 		final String topic = "manyToOneTopic";
 		final int numPartitions = 5;
 		final int numElementsPerPartition = 1000;
@@ -706,7 +743,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
 	 */
 	public void runBigRecordTestTopology() throws Exception {
-		LOG.info("Starting runBigRecordTestTopology()");
 
 		final String topic = "bigRecordTestTopic";
 		final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
@@ -805,13 +841,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		deleteTestTopic(topic);
 
-		LOG.info("Finished runBigRecordTestTopology()");
 	}
 
 	
 	public void runBrokerFailureTest() throws Exception {
-		LOG.info("starting runBrokerFailureTest()");
-
 		final String topic = "brokerFailureTestTopic";
 
 		final int parallelism = 2;
@@ -878,7 +911,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		// start a new broker:
 		brokers.set(leaderIdToShutDown, getKafkaServer(leaderIdToShutDown, tmpKafkaDirs.get(leaderIdToShutDown), kafkaHost, zookeeperConnectionString));
 
-		LOG.info("finished runBrokerFailureTest()");
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index b4511ce..fd635a5 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -44,7 +44,13 @@ public class KafkaITCase extends KafkaConsumerTestBase {
 	public void testOffsetInZookeeper() throws Exception {
 		runOffsetInZookeeperValidationTest();
 	}
-	
+
+	@Test
+	public void testOffsetAutocommitTest() throws Exception {
+		runOffsetAutocommitTest();
+	}
+
+
 	@Test
 	public void testConcurrentProducerConsumerTopology() throws Exception {
 		runSimpleConcurrentProducerConsumerTopology();

http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index b9fc3de..035737e 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -26,24 +26,44 @@ import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public class MockRuntimeContext implements RuntimeContext {
+public class MockRuntimeContext extends StreamingRuntimeContext {
 
 	private final int numberOfParallelSubtasks;
 	private final int indexOfThisSubtask;
 
 	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
+		super(new MockStreamOperator(),
+				new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
+				Collections.<String, Accumulator<?, ?>>emptyMap());
 		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
 		this.indexOfThisSubtask = indexOfThisSubtask;
 	}
 
+	private static class MockStreamOperator extends AbstractStreamOperator {
+		private static final long serialVersionUID = -1153976702711944427L;
+
+		@Override
+		public ExecutionConfig getExecutionConfig() {
+			return new ExecutionConfig();
+		}
+	}
+
+	@Override
+	public boolean isCheckpointingEnabled() {
+		return true;
+	}
 
 	@Override
 	public String getTaskName() {

http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 87a9abd..29ff1f9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import java.util.HashMap;
@@ -51,6 +53,9 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 	
 	/** Type of the values stored in the state, to make sure repeated requests of the state are consistent */
 	private HashMap<String, TypeInformation<?>> stateTypeInfos;
+
+	/** Stream configuration object. */
+	private final StreamConfig streamConfig;
 	
 	
 	public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
@@ -65,6 +70,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 		
 		this.operator = operator;
 		this.taskEnvironment = env;
+		this.streamConfig = new StreamConfig(env.getTaskConfiguration());
 	}
 
 	// ------------------------------------------------------------------------
@@ -173,4 +179,31 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 			}
 		}
 	}
+
+	// ------------------ expose (read only) relevant information from the stream config -------- //
+
+	/**
+	 * Returns true if checkpointing is enabled for the running job.
+	 * @return true if checkpointing is enabled.
+	 */
+	public boolean isCheckpointingEnabled() {
+		return streamConfig.isCheckpointingEnabled();
+	}
+
+	/**
+	 * Returns the checkpointing mode
+	 * @return checkpointing mode
+	 */
+	public CheckpointingMode getCheckpointMode() {
+		return streamConfig.getCheckpointMode();
+	}
+
+	/**
+	 * Returns the buffer timeout of the job
+	 * @return buffer timeout (in milliseconds)
+	 */
+	public long getBufferTimeout() {
+		return streamConfig.getBufferTimeout();
+	}
+
 }