You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by rmetzger <gi...@git.apache.org> on 2015/11/09 17:17:39 UTC

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

GitHub user rmetzger opened a pull request:

    https://github.com/apache/flink/pull/1341

    [FLINK-2974] Add periodic offset committer for Kafka

    The offset committer is only enabled when Flink's checkpointing is disabled.
    When checkpointing is enabled, we commit to ZK upon checkpoint completion.
    
    I removed `docs/apis/kafka.md` because its not liked from anywhere and contained duplicate infos.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rmetzger/flink flink2974

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1341.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1341
    
----
commit ea0715c6c9bbfeedaec7faeb47cb0d636c5e01e1
Author: Robert Metzger <rm...@apache.org>
Date:   2015-11-06T16:06:02Z

    [FLINK-2974] Add periodic offset committer for Kafka when checkpointing is disabled

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44530920
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -567,6 +604,75 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     		}
     		return partitionsToSub;
     	}
    +
    +	/**
    +	 * Thread to periodically commit the current read offset into Zookeeper.
    +	 */
    +	private static class PeriodicOffsetCommitter extends Thread {
    +		private long commitInterval;
    +		private volatile boolean running = true;
    +		private FlinkKafkaConsumer consumer;
    +		private final Object stateUpdateLock = new Object();
    +
    +		public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
    +			this.commitInterval = commitInterval;
    +			this.consumer = consumer;
    +		}
    +
    +		@Override
    +		public void run() {
    +			try {
    +				while (running) {
    +					try {
    +						Thread.sleep(commitInterval);
    +
    +						//  ------------  commit current offsets ----------------
    +
    +						// create copy of current offsets
    +						long[] currentOffsets;
    +						synchronized (stateUpdateLock) {
    +							currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
    +						}
    +
    +						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    +						//noinspection unchecked
    +						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
    +							int partition = tp.partition();
    +							long offset = currentOffsets[partition];
    +							long lastCommitted = consumer.commitedOffsets[partition];
    +
    +							if (offset != OFFSET_NOT_SET) {
    +								if (offset > lastCommitted) {
    +									offsetsToCommit.put(tp, offset);
    +									LOG.debug("Committing offset {} for partition {}", offset, partition);
    +								} else {
    +									LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
    +								}
    +							}
    +						}
    +
    +						consumer.offsetHandler.commit(offsetsToCommit);
    +					} catch (InterruptedException e) {
    +						// looks like the thread is being closed. Leave loop
    --- End diff --
    
    Good style is to check whether the `running` flag is still true. If yes, throw an error, it not, break. Catches the fact when this is interrupted without shutdown, and you want the fetcher to learn about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44531013
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -374,12 +376,32 @@ public void open(Configuration parameters) throws Exception {
     			// no restore request. Let the offset handler take care of the initial offset seeking
     			offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
     		}
    +
    +		// check whether we need to start the periodic checkpoint committer
    +		StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
    +		if(!streamingRuntimeContext.isCheckpointingEnabled()) {
    +			// we use Kafka's own configuration parameter key for this.
    +			// Note that the default configuration value in Kafka is 60 * 1000, so we use the
    +			// same here.
    +			long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
    +			offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this);
    +			offsetCommitter.start();
    +			LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval);
    +		}
     	}
     
     	@Override
     	public void run(SourceContext<T> sourceContext) throws Exception {
     		if (fetcher != null) {
    -			fetcher.run(sourceContext, valueDeserializer, lastOffsets);
    +			// by default, we use the checkpoint lock for updating the state
    +			Object stateUpdateLock = sourceContext.getCheckpointLock();
    +			
    +			// if checkpointing is disabled, we use the checkpoint committer's lock object
    +			StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
    +			if(!streamingRuntimeContext.isCheckpointingEnabled()) {
    --- End diff --
    
    code style, space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1341#issuecomment-158377427
  
    Some comments, manly on redundant constructs and style.
    
    Otherwise good to merge...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44530703
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -449,6 +481,8 @@ public void close() throws Exception {
     
     	@Override
     	public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
    +		Preconditions.checkState(offsetCommitter == null, "Periodic offset committer and checkpointing can not be enabled at the same time");
    --- End diff --
    
    The other preconditions functions are statically imported. Would be nice to follow one style within a class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r45462057
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -567,6 +588,76 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     		}
     		return partitionsToSub;
     	}
    +
    +	/**
    +	 * Thread to periodically commit the current read offset into Zookeeper.
    +	 */
    +	private static class PeriodicOffsetCommitter extends Thread {
    +		private long commitInterval;
    +		private volatile boolean running = true;
    +		private volatile Thread self;
    +		private FlinkKafkaConsumer consumer;
    +		private final Object stateUpdateLock = new Object();
    +
    +		public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
    +			this.commitInterval = commitInterval;
    +			this.consumer = consumer;
    +			this.self = this;
    +		}
    +
    +		@Override
    +		public void run() {
    +			try {
    +				while (running) {
    +					try {
    +						Thread.sleep(commitInterval);
    +
    +						//  ------------  commit current offsets ----------------
    +
    +						// create copy of current offsets
    +						long[] currentOffsets;
    +						synchronized (stateUpdateLock) {
    +							currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
    +						}
    +
    +						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    +						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
    +							int partition = tp.partition();
    +							long offset = currentOffsets[partition];
    +							long lastCommitted = consumer.commitedOffsets[partition];
    +
    +							if (offset != OFFSET_NOT_SET) {
    +								if (offset > lastCommitted) {
    +									offsetsToCommit.put(tp, offset);
    +									LOG.debug("Committing offset {} for partition {}", offset, partition);
    +								} else {
    +									LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
    +								}
    +							}
    +						}
    +
    +						consumer.offsetHandler.commit(offsetsToCommit);
    +					} catch (InterruptedException e) {
    +						if(running) {
    +							// throw unexpected interruption
    +							throw e;
    +						}
    +						// looks like the thread is being closed. Leave loop
    +						break;
    +					}
    +				}
    +			} catch (Throwable t) {
    +				LOG.warn("Periodic checkpoint committer is stopping the fetcher because of an error", t);
    +				consumer.fetcher.stopWithError(t);
    +			}
    +		}
    +
    +		public void close() {
    +			this.running = false;
    +			this.self.interrupt();
    --- End diff --
    
    Same as if you do 'this.interrupt()'. Seems there is a confusion about `this` vs `Thread.currentThread()`.
    
    `this` is still the object reference, the thread represented by that reference, not the calling thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1341#issuecomment-157738335
  
    Thank you for all the comments. I addressed all of them!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44531764
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -567,6 +604,75 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     		}
     		return partitionsToSub;
     	}
    +
    +	/**
    +	 * Thread to periodically commit the current read offset into Zookeeper.
    +	 */
    +	private static class PeriodicOffsetCommitter extends Thread {
    +		private long commitInterval;
    +		private volatile boolean running = true;
    +		private FlinkKafkaConsumer consumer;
    +		private final Object stateUpdateLock = new Object();
    +
    +		public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
    +			this.commitInterval = commitInterval;
    +			this.consumer = consumer;
    +		}
    +
    +		@Override
    +		public void run() {
    +			try {
    +				while (running) {
    +					try {
    +						Thread.sleep(commitInterval);
    +
    +						//  ------------  commit current offsets ----------------
    +
    +						// create copy of current offsets
    +						long[] currentOffsets;
    +						synchronized (stateUpdateLock) {
    +							currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
    +						}
    +
    +						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    +						//noinspection unchecked
    +						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
    --- End diff --
    
    Redundant case and warning suppression?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r45461644
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -374,12 +372,33 @@ public void open(Configuration parameters) throws Exception {
     			// no restore request. Let the offset handler take care of the initial offset seeking
     			offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
     		}
    +
    +
     	}
     
     	@Override
     	public void run(SourceContext<T> sourceContext) throws Exception {
     		if (fetcher != null) {
    +			// For non-checkpointed sources, a thread which periodically commits the current offset into ZK.
    +			PeriodicOffsetCommitter offsetCommitter = null;
    +
    +			// check whether we need to start the periodic checkpoint committer
    +			StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
    +			if (!streamingRuntimeContext.isCheckpointingEnabled()) {
    +				// we use Kafka's own configuration parameter key for this.
    +				// Note that the default configuration value in Kafka is 60 * 1000, so we use the
    +				// same here.
    +				long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
    +				offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this);
    +				offsetCommitter.start();
    +				LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval);
    +			}
    +
     			fetcher.run(sourceContext, valueDeserializer, lastOffsets);
    +
    +			if(offsetCommitter != null) {
    --- End diff --
    
    Style space ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1341#issuecomment-158377800
  
    Thank you for the review. I'll address the comments and merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44532032
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -374,12 +376,32 @@ public void open(Configuration parameters) throws Exception {
     			// no restore request. Let the offset handler take care of the initial offset seeking
     			offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
     		}
    +
    +		// check whether we need to start the periodic checkpoint committer
    +		StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
    +		if(!streamingRuntimeContext.isCheckpointingEnabled()) {
    +			// we use Kafka's own configuration parameter key for this.
    +			// Note that the default configuration value in Kafka is 60 * 1000, so we use the
    +			// same here.
    +			long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
    +			offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this);
    +			offsetCommitter.start();
    +			LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval);
    +		}
     	}
     
     	@Override
     	public void run(SourceContext<T> sourceContext) throws Exception {
     		if (fetcher != null) {
    -			fetcher.run(sourceContext, valueDeserializer, lastOffsets);
    +			// by default, we use the checkpoint lock for updating the state
    +			Object stateUpdateLock = sourceContext.getCheckpointLock();
    +			
    +			// if checkpointing is disabled, we use the checkpoint committer's lock object
    +			StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
    +			if(!streamingRuntimeContext.isCheckpointingEnabled()) {
    +				stateUpdateLock = offsetCommitter.getStateUpdateLock();
    --- End diff --
    
    This seems funky. Swapping the lock. I think it breaks the exactly-once guarantees.
    
    Also, why checking the runtime context, why not simply `if (offsetCommitter != null)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1341#issuecomment-155785654
  
    I think this breaks the checkpoint atomicity. The fetcher (which emits the data to the source context) will not lock on the checkpoint lock any more, but on an arbitrary new object created by the offset committer.
    
    Why can't the offset committer simply use the checkpoint lock? Blocking the checkpoint lock for an `Arrays.copyOfRange()` call seems pretty okay.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r45463542
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -290,9 +291,6 @@ public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializ
     			}
     		}
     		LOG.info("Topic {} has {} partitions", topic, partitions.length);
    -
    -		// make sure that we take care of the committing
    -		props.setProperty("enable.auto.commit", "false");
    --- End diff --
    
    Exactly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r45461902
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -567,6 +588,76 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     		}
     		return partitionsToSub;
     	}
    +
    +	/**
    +	 * Thread to periodically commit the current read offset into Zookeeper.
    +	 */
    +	private static class PeriodicOffsetCommitter extends Thread {
    +		private long commitInterval;
    --- End diff --
    
    (Option) I think it is good style to make the constant members final and sort them before the volatile/mutable ones. Helps code structure and readability...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44530563
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -374,12 +376,32 @@ public void open(Configuration parameters) throws Exception {
     			// no restore request. Let the offset handler take care of the initial offset seeking
     			offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
     		}
    +
    +		// check whether we need to start the periodic checkpoint committer
    +		StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
    +		if(!streamingRuntimeContext.isCheckpointingEnabled()) {
    --- End diff --
    
    code style, space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44530755
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -567,6 +604,75 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     		}
     		return partitionsToSub;
     	}
    +
    +	/**
    +	 * Thread to periodically commit the current read offset into Zookeeper.
    +	 */
    +	private static class PeriodicOffsetCommitter extends Thread {
    +		private long commitInterval;
    +		private volatile boolean running = true;
    +		private FlinkKafkaConsumer consumer;
    +		private final Object stateUpdateLock = new Object();
    +
    +		public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
    +			this.commitInterval = commitInterval;
    +			this.consumer = consumer;
    +		}
    +
    +		@Override
    +		public void run() {
    +			try {
    +				while (running) {
    +					try {
    +						Thread.sleep(commitInterval);
    +
    +						//  ------------  commit current offsets ----------------
    +
    +						// create copy of current offsets
    +						long[] currentOffsets;
    +						synchronized (stateUpdateLock) {
    +							currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
    +						}
    +
    +						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    +						//noinspection unchecked
    +						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
    +							int partition = tp.partition();
    +							long offset = currentOffsets[partition];
    +							long lastCommitted = consumer.commitedOffsets[partition];
    +
    +							if (offset != OFFSET_NOT_SET) {
    +								if (offset > lastCommitted) {
    +									offsetsToCommit.put(tp, offset);
    +									LOG.debug("Committing offset {} for partition {}", offset, partition);
    +								} else {
    +									LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
    +								}
    +							}
    +						}
    +
    +						consumer.offsetHandler.commit(offsetsToCommit);
    +					} catch (InterruptedException e) {
    +						// looks like the thread is being closed. Leave loop
    +						break;
    +					}
    +				}
    +			} catch(Throwable t) {
    +				LOG.warn("Periodic checkpoint committer is stopping the fetcher because of an error", t);
    +				consumer.fetcher.stopWithError(t);
    +			}
    +		}
    +
    +		public void close() {
    +			running = false;
    +			// interrupt sleep
    +			Thread.currentThread().interrupt();
    --- End diff --
    
    This interrupts the wrong thread, namely the one that calls close() rather then the offset committing thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1341#issuecomment-168211441
  
    This change is only relevant when programs do NOT have checkpointing activated.
    In those cases Flink will periodically commit the current stream offsets to Kafka. Previously, Flink never committed offsets without checkpointing, meaning that Kafka clients never saw progress.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1341


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44529482
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ---
    @@ -173,4 +174,12 @@ public void registerTimer(long time, Triggerable target) {
     			}
     		}
     	}
    +
    +	/**
    +	 * Returns true if checkpointing is enabled for the running job.
    +	 * @return true if checkpointing is enabled.
    +	 */
    +	public boolean isCheckpointingEnabled() {
    +		return taskEnvironment.getTaskConfiguration().getBoolean(StreamConfig.CHECKPOINTING_ENABLED, false);
    --- End diff --
    
    That breaks the way that config values are accessed. The right way is
    `new StreamConfig(taskEnvironment.getTaskConfiguration()).isCheckpointingEnabled()`.
    
    Then you also need not change the visibility of the flag.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r45461871
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -567,6 +588,76 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     		}
     		return partitionsToSub;
     	}
    +
    +	/**
    +	 * Thread to periodically commit the current read offset into Zookeeper.
    +	 */
    +	private static class PeriodicOffsetCommitter extends Thread {
    +		private long commitInterval;
    +		private volatile boolean running = true;
    +		private volatile Thread self;
    +		private FlinkKafkaConsumer consumer;
    +		private final Object stateUpdateLock = new Object();
    --- End diff --
    
    You can probably drop this lock, it is used nowhere. If you want to lock, you need to lock on the proper checkpoint lock.
    
    Locking may not be necessary, as these offsets are only some rough inaccurate notion of progress anyways, if checkpointing is not enabled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1341#issuecomment-158373979
  
    You can probably undo most of the docs changes, these are fixed by now...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by filipefigcorreia <gi...@git.apache.org>.
Github user filipefigcorreia commented on the pull request:

    https://github.com/apache/flink/pull/1341#issuecomment-168036891
  
    Do these changes mean that now it is not possible to have flink always read from the beginning of a Kafka topic if needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1341#issuecomment-158391607
  
    I updated the PR. Once travis is green I'll merge it



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44530629
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -419,6 +441,14 @@ public void cancel() {
     				LOG.warn("Error while closing Kafka connector data fetcher", e);
     			}
     		}
    +
    +		// close the checkpoint committer, if present. We close the committer before
    +		// the offset handler
    +		PeriodicOffsetCommitter committer = this.offsetCommitter;
    +		this.offsetCommitter = null;
    +		if(committer != null) {
    --- End diff --
    
    code style, space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r45462208
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---
    @@ -26,24 +26,42 @@
     import org.apache.flink.api.common.accumulators.LongCounter;
     import org.apache.flink.api.common.cache.DistributedCache;
     import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
    -import org.apache.flink.api.common.functions.RuntimeContext;
     import org.apache.flink.api.common.state.OperatorState;
     import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.MockEnvironment;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
     
     import java.io.Serializable;
    +import java.util.Collections;
     import java.util.List;
     import java.util.Map;
     
    -public class MockRuntimeContext implements RuntimeContext {
    +public class MockRuntimeContext extends StreamingRuntimeContext {
     
     	private final int numberOfParallelSubtasks;
     	private final int indexOfThisSubtask;
     
     	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
    +		super(new MockStreamOperator(),
    +				new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
    +				Collections.<String, Accumulator<?, ?>>emptyMap());
     		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
     		this.indexOfThisSubtask = indexOfThisSubtask;
     	}
     
    +	public static class MockStreamOperator extends AbstractStreamOperator {
    --- End diff --
    
    Can probably be private, no one else uses this, if I see it correctly...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44530770
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -567,6 +604,75 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     		}
     		return partitionsToSub;
     	}
    +
    +	/**
    +	 * Thread to periodically commit the current read offset into Zookeeper.
    +	 */
    +	private static class PeriodicOffsetCommitter extends Thread {
    +		private long commitInterval;
    +		private volatile boolean running = true;
    +		private FlinkKafkaConsumer consumer;
    +		private final Object stateUpdateLock = new Object();
    +
    +		public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
    +			this.commitInterval = commitInterval;
    +			this.consumer = consumer;
    +		}
    +
    +		@Override
    +		public void run() {
    +			try {
    +				while (running) {
    +					try {
    +						Thread.sleep(commitInterval);
    +
    +						//  ------------  commit current offsets ----------------
    +
    +						// create copy of current offsets
    +						long[] currentOffsets;
    +						synchronized (stateUpdateLock) {
    +							currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
    +						}
    +
    +						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    +						//noinspection unchecked
    +						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
    +							int partition = tp.partition();
    +							long offset = currentOffsets[partition];
    +							long lastCommitted = consumer.commitedOffsets[partition];
    +
    +							if (offset != OFFSET_NOT_SET) {
    +								if (offset > lastCommitted) {
    +									offsetsToCommit.put(tp, offset);
    +									LOG.debug("Committing offset {} for partition {}", offset, partition);
    +								} else {
    +									LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
    +								}
    +							}
    +						}
    +
    +						consumer.offsetHandler.commit(offsetsToCommit);
    +					} catch (InterruptedException e) {
    +						// looks like the thread is being closed. Leave loop
    +						break;
    +					}
    +				}
    +			} catch(Throwable t) {
    --- End diff --
    
    code style, space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r45461519
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -290,9 +291,6 @@ public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializ
     			}
     		}
     		LOG.info("Topic {} has {} partitions", topic, partitions.length);
    -
    -		// make sure that we take care of the committing
    -		props.setProperty("enable.auto.commit", "false");
    --- End diff --
    
    Is this not necessary, because the low-level consumer does not respect it anyways?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1341#issuecomment-155786742
  
    Can you also move the periodic offset committer into the `run()` methods? Saves another field that may have concurrent access. The run methods can make sure it initializes and cleans up, no other functions need ever worry about the committer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1341#issuecomment-156372109
  
    Thank you for the detailed review. I will address your concerns soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44531824
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -567,6 +604,75 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     		}
     		return partitionsToSub;
     	}
    +
    +	/**
    +	 * Thread to periodically commit the current read offset into Zookeeper.
    +	 */
    +	private static class PeriodicOffsetCommitter extends Thread {
    +		private long commitInterval;
    +		private volatile boolean running = true;
    +		private FlinkKafkaConsumer consumer;
    +		private final Object stateUpdateLock = new Object();
    +
    +		public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
    +			this.commitInterval = commitInterval;
    +			this.consumer = consumer;
    +		}
    +
    +		@Override
    +		public void run() {
    +			try {
    +				while (running) {
    +					try {
    +						Thread.sleep(commitInterval);
    +
    +						//  ------------  commit current offsets ----------------
    +
    +						// create copy of current offsets
    +						long[] currentOffsets;
    +						synchronized (stateUpdateLock) {
    +							currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
    +						}
    +
    +						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
    +						//noinspection unchecked
    +						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
    +							int partition = tp.partition();
    +							long offset = currentOffsets[partition];
    +							long lastCommitted = consumer.commitedOffsets[partition];
    +
    +							if (offset != OFFSET_NOT_SET) {
    +								if (offset > lastCommitted) {
    +									offsetsToCommit.put(tp, offset);
    +									LOG.debug("Committing offset {} for partition {}", offset, partition);
    +								} else {
    +									LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
    +								}
    +							}
    +						}
    +
    +						consumer.offsetHandler.commit(offsetsToCommit);
    +					} catch (InterruptedException e) {
    +						// looks like the thread is being closed. Leave loop
    +						break;
    --- End diff --
    
    Good style is to throw the exception if `running` is still true. That way the fetcher learns about unexpected interruptions of the thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2974] Add periodic offset committer for...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r45461971
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---
    @@ -567,6 +588,76 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     		}
     		return partitionsToSub;
     	}
    +
    +	/**
    +	 * Thread to periodically commit the current read offset into Zookeeper.
    +	 */
    +	private static class PeriodicOffsetCommitter extends Thread {
    +		private long commitInterval;
    +		private volatile boolean running = true;
    +		private volatile Thread self;
    +		private FlinkKafkaConsumer consumer;
    +		private final Object stateUpdateLock = new Object();
    +
    +		public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
    +			this.commitInterval = commitInterval;
    +			this.consumer = consumer;
    +			this.self = this;
    --- End diff --
    
    You don't need this field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---