You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/23 06:37:31 UTC

kafka git commit: HOTFIX: check offset limits in streamtask when recovering KTable store

Repository: kafka
Updated Branches:
  refs/heads/trunk 04585d99c -> 982ab09a7


HOTFIX: check offset limits in streamtask when recovering KTable store

guozhangwang

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #947 from ymatsuda/hotfix2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/982ab09a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/982ab09a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/982ab09a

Branch: refs/heads/trunk
Commit: 982ab09a790c6dbc2e1a5c52311103fd14ab5233
Parents: 04585d9
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Feb 22 21:37:27 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Feb 22 21:37:27 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/processor/internals/AbstractTask.java | 11 +++++++++++
 .../kafka/streams/processor/internals/StandbyTask.java  | 12 ------------
 .../streams/processor/internals/StreamThreadTest.java   |  5 +++++
 3 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/982ab09a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 3f7140a..8ff72bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -69,6 +70,9 @@ public abstract class AbstractTask {
     }
 
     protected void initializeStateStores() {
+        // set initial offset limits
+        initializeOffsetLimits();
+
         for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
             StateStore store = stateStoreSupplier.get();
             store.init(this.processorContext, store);
@@ -109,4 +113,11 @@ public abstract class AbstractTask {
         return Collections.emptyMap();
     }
 
+    protected void initializeOffsetLimits() {
+        for (TopicPartition partition : partitions) {
+            OffsetAndMetadata metadata = consumer.committed(partition); // TODO: batch API?
+            stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() : 0L);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/982ab09a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 7b6ab8c..da454cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
@@ -71,9 +70,6 @@ public class StandbyTask extends AbstractTask {
         ((StandbyContextImpl) this.processorContext).initialized();
 
         this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
-
-        // set initial offset limits
-        initializeOffsetLimits();
     }
 
     public Map<TopicPartition, Long> checkpointedOffsets() {
@@ -98,12 +94,4 @@ public class StandbyTask extends AbstractTask {
         // reinitialize offset limits
         initializeOffsetLimits();
     }
-
-    protected void initializeOffsetLimits() {
-        for (TopicPartition partition : partitions) {
-            OffsetAndMetadata metadata = consumer.committed(partition); // TODO: batch API?
-            stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() : 0L);
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/982ab09a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 039cb96..e072747 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -144,6 +144,11 @@ public class StreamThreadTest {
             super.commit();
             committed = true;
         }
+
+        @Override
+        protected void initializeOffsetLimits() {
+            // do nothing
+        }
     }
 
     private ByteArraySerializer serializer = new ByteArraySerializer();