You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/20 15:35:00 UTC
flink git commit: [FLINK-2974] Add periodic offset committer for
Kafka when checkpointing is disabled
Repository: flink
Updated Branches:
refs/heads/master 8dc70f2e7 -> 5864e4fd4
[FLINK-2974] Add periodic offset committer for Kafka when checkpointing is disabled
This closes #1341
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5864e4fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5864e4fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5864e4fd
Branch: refs/heads/master
Commit: 5864e4fd4368f1af6e1bc623c91b8e719693471d
Parents: 8dc70f2
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Nov 6 17:06:02 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Nov 20 15:34:42 2015 +0100
----------------------------------------------------------------------
docs/apis/streaming_guide.md | 1 +
docs/setup/yarn_setup.md | 2 +-
.../util/AbstractRuntimeUDFContext.java | 1 +
.../connectors/kafka/FlinkKafkaConsumer.java | 111 +++++++++++++++++--
.../connectors/kafka/internals/Fetcher.java | 12 +-
.../kafka/internals/LegacyFetcher.java | 9 +-
.../connectors/kafka/KafkaConsumerTestBase.java | 74 +++++++++----
.../streaming/connectors/kafka/KafkaITCase.java | 8 +-
.../kafka/testutils/MockRuntimeContext.java | 24 +++-
.../api/operators/StreamingRuntimeContext.java | 33 ++++++
10 files changed, 234 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index db1f3a7..626df10 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -3499,6 +3499,7 @@ Also note that Flink can only restart the topology if enough processing slots ar
So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
Flink on YARN supports automatic restart of lost YARN containers.
+If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
#### Kafka Producer
http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index 7f6950c..12f1986 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -60,7 +60,7 @@ Apache [Hadoop YARN](http://hadoop.apache.org/) is a cluster resource management
- at least Apache Hadoop 2.2
- HDFS (Hadoop Distributed File System) (or another distributed file system supported by Hadoop)
-If you have troubles using the Flink YARN client, have a look in the [FAQ section]({{ site.baseurl }}/faq.html).
+If you have troubles using the Flink YARN client, have a look in the [FAQ section](http://flink.apache.org/faq.html#yarn-deployment).
### Start Flink Session
http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index be8ac9d..34ec0b5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -175,4 +175,5 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index c7ae1cc..e701639 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
@@ -233,7 +234,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
private transient long[] restoreToOffset;
private volatile boolean running = true;
-
+
// ------------------------------------------------------------------------
/**
@@ -258,8 +259,8 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
this.offsetStore = checkNotNull(offsetStore);
this.fetcherType = checkNotNull(fetcherType);
- if(fetcherType == FetcherType.NEW_HIGH_LEVEL) {
- throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 is not yet " +
+ if (fetcherType == FetcherType.NEW_HIGH_LEVEL) {
+ throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 / 0.9.0 is not yet " +
"supported in Flink");
}
if (offsetStore == OffsetStore.KAFKA && fetcherType == FetcherType.LEGACY_LOW_LEVEL) {
@@ -290,9 +291,6 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
}
}
LOG.info("Topic {} has {} partitions", topic, partitions.length);
-
- // make sure that we take care of the committing
- props.setProperty("enable.auto.commit", "false");
}
// ------------------------------------------------------------------------
@@ -302,7 +300,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
-
+
final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
final int thisComsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
@@ -374,12 +372,33 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
// no restore request. Let the offset handler take care of the initial offset seeking
offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
}
+
+
}
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
if (fetcher != null) {
+ // For non-checkpointed sources, a thread which periodically commits the current offset into ZK.
+ PeriodicOffsetCommitter offsetCommitter = null;
+
+ // check whether we need to start the periodic checkpoint committer
+ StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
+ if (!streamingRuntimeContext.isCheckpointingEnabled()) {
+ // we use Kafka's own configuration parameter key for this.
+ // Note that the default configuration value in Kafka is 60 * 1000, so we use the
+ // same here.
+ long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
+ offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this);
+ offsetCommitter.start();
+ LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval);
+ }
+
fetcher.run(sourceContext, valueDeserializer, lastOffsets);
+
+ if (offsetCommitter != null) {
+ offsetCommitter.close();
+ }
}
else {
// this source never completes
@@ -419,7 +438,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
LOG.warn("Error while closing Kafka connector data fetcher", e);
}
}
-
+
OffsetHandler offsetHandler = this.offsetHandler;
this.offsetHandler = null;
if (offsetHandler != null) {
@@ -430,6 +449,8 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
LOG.warn("Error while closing Kafka connector offset handler", e);
}
}
+
+
}
@Override
@@ -567,6 +588,69 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
}
return partitionsToSub;
}
+
+ /**
+ * Thread to periodically commit the current read offset into Zookeeper.
+ */
+ private static class PeriodicOffsetCommitter extends Thread {
+ private final long commitInterval;
+ private final FlinkKafkaConsumer consumer;
+ private volatile boolean running = true;
+
+ public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
+ this.commitInterval = commitInterval;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (running) {
+ try {
+ Thread.sleep(commitInterval);
+ // ------------ commit current offsets ----------------
+
+ // create copy of current offsets
+ long[] currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
+
+ Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
+ for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
+ int partition = tp.partition();
+ long offset = currentOffsets[partition];
+ long lastCommitted = consumer.commitedOffsets[partition];
+
+ if (offset != OFFSET_NOT_SET) {
+ if (offset > lastCommitted) {
+ offsetsToCommit.put(tp, offset);
+ LOG.debug("Committing offset {} for partition {}", offset, partition);
+ } else {
+ LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
+ }
+ }
+ }
+
+ consumer.offsetHandler.commit(offsetsToCommit);
+ } catch (InterruptedException e) {
+ if (running) {
+ // throw unexpected interruption
+ throw e;
+ }
+ // looks like the thread is being closed. Leave loop
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ LOG.warn("Periodic checkpoint committer is stopping the fetcher because of an error", t);
+ consumer.fetcher.stopWithError(t);
+ }
+ }
+
+ public void close() {
+ this.running = false;
+ this.interrupt();
+ }
+
+ }
// ------------------------------------------------------------------------
// Kafka / ZooKeeper communication utilities
@@ -657,10 +741,19 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
return partitions;
}
+ /**
+ * Turn a broker instance into a node instance
+ * @param broker broker instance
+ * @return Node representing the given broker
+ */
private static Node brokerToNode(Broker broker) {
return new Node(broker.id(), broker.host(), broker.port());
}
-
+
+ /**
+ * Validate the ZK configuration, checking for required parameters
+ * @param props Properties to check
+ */
protected static void validateZooKeeperConfig(Properties props) {
if (props.getProperty("zookeeper.connect") == null) {
throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
index 4345926..063d089 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
@@ -64,12 +64,11 @@ public interface Fetcher {
*
* @param sourceContext The source context to emit elements to.
* @param valueDeserializer The deserializer to decode the raw values with.
- * @param lastOffsets The array into which to store the offsets foe which elements are emitted.
+ * @param lastOffsets The array into which to store the offsets for which elements are emitted (operator state)
*
* @param <T> The type of elements produced by the fetcher and emitted to the source context.
*/
- <T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer,
- long[] lastOffsets) throws Exception;
+ <T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, long[] lastOffsets) throws Exception;
/**
* Set the next offset to read from for the given partition.
@@ -80,4 +79,11 @@ public interface Fetcher {
* @param offsetToRead To offset to seek to.
*/
void seek(TopicPartition topicPartition, long offsetToRead);
+
+ /**
+ * Exit run loop with given error and release all resources.
+ *
+ * @param t Error cause
+ */
+ void stopWithError(Throwable t);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index c4ba103..212ba7d 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -122,7 +122,7 @@ public class LegacyFetcher implements Fetcher {
}
@Override
- public <T> void run(SourceFunction.SourceContext<T> sourceContext,
+ public <T> void run(SourceFunction.SourceContext<T> sourceContext,
DeserializationSchema<T> valueDeserializer,
long[] lastOffsets) throws Exception {
@@ -258,7 +258,8 @@ public class LegacyFetcher implements Fetcher {
*
* @param error The error to report.
*/
- void onErrorInFetchThread(Throwable error) {
+ @Override
+ public void stopWithError(Throwable error) {
if (this.error.compareAndSet(null, error)) {
// we are the first to report an error
if (mainThread != null) {
@@ -445,7 +446,7 @@ public class LegacyFetcher implements Fetcher {
final T value = valueDeserializer.deserialize(valueByte);
final long offset = msg.offset();
- synchronized (sourceContext.getCheckpointLock()) {
+ synchronized (this.sourceContext.getCheckpointLock()) {
sourceContext.collect(value);
offsetsState[partition] = offset;
}
@@ -464,7 +465,7 @@ public class LegacyFetcher implements Fetcher {
}
catch (Throwable t) {
// report to the main thread
- owner.onErrorInFetchThread(t);
+ owner.stopWithError(t);
}
finally {
// end of run loop. close connection to consumer
http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ffb6818..3e8154f 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -181,12 +181,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
/**
* Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper.
*
- * This test is only applicable if Teh Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
+ * This test is only applicable if the Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
*/
public void runOffsetInZookeeperValidationTest() throws Exception {
- LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
-
- final String topicName = "testOffsetHacking";
+ final String topicName = "testOffsetInZK";
final int parallelism = 3;
createTestTopic(topicName, parallelism, 1);
@@ -223,8 +221,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
- assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
- assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+ assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o2 >= 0 && o2 <= 100));
+ assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o3 >= 0 && o3 <= 100));
LOG.info("Manipulating offsets");
@@ -239,8 +237,56 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
readSequence(env3, standardProps, parallelism, topicName, 50, 50);
deleteTestTopic(topicName);
+ }
+
+ public void runOffsetAutocommitTest() throws Exception {
+ final String topicName = "testOffsetAutocommit";
+ final int parallelism = 3;
+
+ createTestTopic(topicName, parallelism, 1);
+
+ StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env1.getConfig().disableSysoutLogging();
+ env1.setNumberOfExecutionRetries(0);
+ env1.setParallelism(parallelism);
+
+ StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ // NOTE: We are not enabling the checkpointing!
+ env2.getConfig().disableSysoutLogging();
+ env2.setNumberOfExecutionRetries(0);
+ env2.setParallelism(parallelism);
+
+
+ // write a sequence from 0 to 99 to each of the 3 partitions.
+ writeSequence(env1, topicName, 100, parallelism);
+
+
+ // the readSequence operation sleeps for 20 ms between each record.
+ // setting a delay of 25*20 = 500 for the commit interval makes
+ // sure that we commit roughly 3-4 times while reading, however
+ // at least once.
+ Properties readProps = new Properties();
+ readProps.putAll(standardProps);
+ readProps.setProperty("auto.commit.interval.ms", "500");
+
+ // read so that the offset can be committed to ZK
+ readSequence(env2, readProps, parallelism, topicName, 100, 0);
+
+ // get the offset
+ ZkClient zkClient = createZookeeperClient();
+
+ long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
+ long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
+ long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
- LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
+ LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
+
+ // ensure that the offset has been committed
+ assertTrue("Offset of o1=" + o1 + " was not in range", o1 > 0 && o1 <= 100);
+ assertTrue("Offset of o2=" + o2 + " was not in range", o2 > 0 && o2 <= 100);
+ assertTrue("Offset of o3=" + o3 + " was not in range", o3 > 0 && o3 <= 100);
+
+ deleteTestTopic(topicName);
}
/**
@@ -257,8 +303,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
*/
@RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
- LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
-
final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
final int parallelism = 3;
final int elementsPerPartition = 100;
@@ -361,8 +405,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
throw e;
}
- LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
-
deleteTestTopic(topic);
}
@@ -371,7 +413,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
* Flink sources.
*/
public void runOneToOneExactlyOnceTest() throws Exception {
- LOG.info("Starting runOneToOneExactlyOnceTest()");
final String topic = "oneToOneTopic";
final int parallelism = 5;
@@ -416,8 +457,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
* one Flink source will read multiple Kafka partitions.
*/
public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
- LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
-
final String topic = "oneToManyTopic";
final int numPartitions = 5;
final int numElementsPerPartition = 1000;
@@ -463,8 +502,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
* that some Flink sources will read no partitions.
*/
public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
- LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
-
final String topic = "manyToOneTopic";
final int numPartitions = 5;
final int numElementsPerPartition = 1000;
@@ -706,7 +743,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
* see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
*/
public void runBigRecordTestTopology() throws Exception {
- LOG.info("Starting runBigRecordTestTopology()");
final String topic = "bigRecordTestTopic";
final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
@@ -805,13 +841,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
deleteTestTopic(topic);
- LOG.info("Finished runBigRecordTestTopology()");
}
public void runBrokerFailureTest() throws Exception {
- LOG.info("starting runBrokerFailureTest()");
-
final String topic = "brokerFailureTestTopic";
final int parallelism = 2;
@@ -878,7 +911,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// start a new broker:
brokers.set(leaderIdToShutDown, getKafkaServer(leaderIdToShutDown, tmpKafkaDirs.get(leaderIdToShutDown), kafkaHost, zookeeperConnectionString));
- LOG.info("finished runBrokerFailureTest()");
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index b4511ce..fd635a5 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -44,7 +44,13 @@ public class KafkaITCase extends KafkaConsumerTestBase {
public void testOffsetInZookeeper() throws Exception {
runOffsetInZookeeperValidationTest();
}
-
+
+ @Test
+ public void testOffsetAutocommitTest() throws Exception {
+ runOffsetAutocommitTest();
+ }
+
+
@Test
public void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index b9fc3de..035737e 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -26,24 +26,44 @@ import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import java.io.Serializable;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-public class MockRuntimeContext implements RuntimeContext {
+public class MockRuntimeContext extends StreamingRuntimeContext {
private final int numberOfParallelSubtasks;
private final int indexOfThisSubtask;
public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
+ super(new MockStreamOperator(),
+ new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
+ Collections.<String, Accumulator<?, ?>>emptyMap());
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.indexOfThisSubtask = indexOfThisSubtask;
}
+ private static class MockStreamOperator extends AbstractStreamOperator {
+ private static final long serialVersionUID = -1153976702711944427L;
+
+ @Override
+ public ExecutionConfig getExecutionConfig() {
+ return new ExecutionConfig();
+ }
+ }
+
+ @Override
+ public boolean isCheckpointingEnabled() {
+ return true;
+ }
@Override
public String getTaskName() {
http://git-wip-us.apache.org/repos/asf/flink/blob/5864e4fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 87a9abd..29ff1f9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import java.util.HashMap;
@@ -51,6 +53,9 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
/** Type of the values stored in the state, to make sure repeated requests of the state are consistent */
private HashMap<String, TypeInformation<?>> stateTypeInfos;
+
+ /** Stream configuration object. */
+ private final StreamConfig streamConfig;
public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
@@ -65,6 +70,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
this.operator = operator;
this.taskEnvironment = env;
+ this.streamConfig = new StreamConfig(env.getTaskConfiguration());
}
// ------------------------------------------------------------------------
@@ -173,4 +179,31 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
}
}
}
+
+ // ------------------ expose (read only) relevant information from the stream config -------- //
+
+ /**
+ * Returns true if checkpointing is enabled for the running job.
+ * @return true if checkpointing is enabled.
+ */
+ public boolean isCheckpointingEnabled() {
+ return streamConfig.isCheckpointingEnabled();
+ }
+
+ /**
+ * Returns the checkpointing mode
+ * @return checkpointing mode
+ */
+ public CheckpointingMode getCheckpointMode() {
+ return streamConfig.getCheckpointMode();
+ }
+
+ /**
+ * Returns the buffer timeout of the job
+ * @return buffer timeout (in milliseconds)
+ */
+ public long getBufferTimeout() {
+ return streamConfig.getBufferTimeout();
+ }
+
}