You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/23 06:37:31 UTC
kafka git commit: HOTFIX: check offset limits in streamtask when
recovering KTable store
Repository: kafka
Updated Branches:
refs/heads/trunk 04585d99c -> 982ab09a7
HOTFIX: check offset limits in streamtask when recovering KTable store
guozhangwang
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #947 from ymatsuda/hotfix2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/982ab09a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/982ab09a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/982ab09a
Branch: refs/heads/trunk
Commit: 982ab09a790c6dbc2e1a5c52311103fd14ab5233
Parents: 04585d9
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Feb 22 21:37:27 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Feb 22 21:37:27 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/processor/internals/AbstractTask.java | 11 +++++++++++
.../kafka/streams/processor/internals/StandbyTask.java | 12 ------------
.../streams/processor/internals/StreamThreadTest.java | 5 +++++
3 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/982ab09a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 3f7140a..8ff72bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -69,6 +70,9 @@ public abstract class AbstractTask {
}
protected void initializeStateStores() {
+ // set initial offset limits
+ initializeOffsetLimits();
+
for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
StateStore store = stateStoreSupplier.get();
store.init(this.processorContext, store);
@@ -109,4 +113,11 @@ public abstract class AbstractTask {
return Collections.emptyMap();
}
+ protected void initializeOffsetLimits() {
+ for (TopicPartition partition : partitions) {
+ OffsetAndMetadata metadata = consumer.committed(partition); // TODO: batch API?
+ stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() : 0L);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/982ab09a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 7b6ab8c..da454cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
@@ -71,9 +70,6 @@ public class StandbyTask extends AbstractTask {
((StandbyContextImpl) this.processorContext).initialized();
this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
-
- // set initial offset limits
- initializeOffsetLimits();
}
public Map<TopicPartition, Long> checkpointedOffsets() {
@@ -98,12 +94,4 @@ public class StandbyTask extends AbstractTask {
// reinitialize offset limits
initializeOffsetLimits();
}
-
- protected void initializeOffsetLimits() {
- for (TopicPartition partition : partitions) {
- OffsetAndMetadata metadata = consumer.committed(partition); // TODO: batch API?
- stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() : 0L);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/982ab09a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 039cb96..e072747 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -144,6 +144,11 @@ public class StreamThreadTest {
super.commit();
committed = true;
}
+
+ @Override
+ protected void initializeOffsetLimits() {
+ // do nothing
+ }
}
private ByteArraySerializer serializer = new ByteArraySerializer();