You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/04 23:51:14 UTC

kafka git commit: KAFKA-3207: Fix StateChangeLogger to use the right topic name

Repository: kafka
Updated Branches:
  refs/heads/trunk 9b47f9a7f -> 7802a90ed


KAFKA-3207: Fix StateChangeLogger to use the right topic name

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda

Closes #865 from guozhangwang/K3207


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7802a90e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7802a90e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7802a90e

Branch: refs/heads/trunk
Commit: 7802a90ed98ea5b9a2b2dcf2e04db1a50e34a2f8
Parents: 9b47f9a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Feb 4 14:51:10 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Feb 4 14:51:10 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/processor/ProcessorContext.java    | 15 ++++++++++++++-
 .../streams/processor/internals/AbstractTask.java    |  6 ++++++
 .../processor/internals/ProcessorContextImpl.java    | 12 +++++++++---
 .../processor/internals/StandbyContextImpl.java      | 15 ++++++++++++---
 .../streams/processor/internals/StandbyTask.java     |  2 +-
 .../state/internals/InMemoryKeyValueLoggedStore.java | 10 +++++-----
 .../state/internals/RawStoreChangeLogger.java        |  8 ++++----
 .../streams/state/internals/StoreChangeLogger.java   | 13 +++++++------
 .../kafka/streams/state/KeyValueStoreTestDriver.java |  2 +-
 .../org/apache/kafka/test/MockProcessorContext.java  |  7 ++++++-
 10 files changed, 65 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 9740fa3..79376ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -29,11 +29,18 @@ import java.io.File;
 public interface ProcessorContext {
 
     /**
+     * Returns the job id
+     *
+     * @return the job id
+     */
+    String jobId();
+
+    /**
      * Returns the task id
      *
      * @return the task id
      */
-    TaskId id();
+    TaskId taskId();
 
     /**
      * Returns the key serializer
@@ -84,6 +91,12 @@ public interface ProcessorContext {
      */
     void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback);
 
+    /**
+     * Get the state store given the store name.
+     *
+     * @param name The store name
+     * @return The state store instance
+     */
     StateStore getStateStore(String name);
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 46dd738..162a926 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -36,6 +36,7 @@ import java.util.Set;
 
 public abstract class AbstractTask {
     protected final TaskId id;
+    protected final String jobId;
     protected final ProcessorTopology topology;
     protected final Consumer consumer;
     protected final ProcessorStateManager stateMgr;
@@ -51,6 +52,7 @@ public abstract class AbstractTask {
                            StreamsConfig config,
                            boolean isStandby) {
         this.id = id;
+        this.jobId = jobId;
         this.partitions = new HashSet<>(partitions);
         this.topology = topology;
         this.consumer = consumer;
@@ -77,6 +79,10 @@ public abstract class AbstractTask {
         return id;
     }
 
+    public final String jobId() {
+        return jobId;
+    }
+
     public final Set<TopicPartition> partitions() {
         return this.partitions;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 4b72394..c4acc01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -74,12 +74,18 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         this.initialized = true;
     }
 
-    public TaskId id() {
+    public ProcessorStateManager getStateMgr() {
+        return stateMgr;
+    }
+
+    @Override
+    public TaskId taskId() {
         return id;
     }
 
-    public ProcessorStateManager getStateMgr() {
-        return stateMgr;
+    @Override
+    public String jobId() {
+        return task.jobId();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 133d597..82633b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -35,6 +35,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class);
 
     private final TaskId id;
+    private final String jobId;
     private final StreamsMetrics metrics;
     private final ProcessorStateManager stateMgr;
 
@@ -46,10 +47,12 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     private boolean initialized;
 
     public StandbyContextImpl(TaskId id,
+                              String jobId,
                               StreamsConfig config,
                               ProcessorStateManager stateMgr,
                               StreamsMetrics metrics) {
         this.id = id;
+        this.jobId = jobId;
         this.metrics = metrics;
         this.stateMgr = stateMgr;
 
@@ -65,12 +68,18 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
         this.initialized = true;
     }
 
-    public TaskId id() {
+    public ProcessorStateManager getStateMgr() {
+        return stateMgr;
+    }
+
+    @Override
+    public TaskId taskId() {
         return id;
     }
 
-    public ProcessorStateManager getStateMgr() {
-        return stateMgr;
+    @Override
+    public String jobId() {
+        return jobId;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 861b830..7b6ab8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -64,7 +64,7 @@ public class StandbyTask extends AbstractTask {
         super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true);
 
         // initialize the topology with its own context
-        this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
+        this.processorContext = new StandbyContextImpl(id, jobId, config, stateMgr, metrics);
 
         initializeStateStores();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index 5be6483..94349bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -29,25 +29,25 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
 
     private final KeyValueStore<K, V> inner;
     private final Serdes<K, V> serdes;
-    private final String topic;
+    private final String storeName;
 
     private StoreChangeLogger<K, V> changeLogger;
     private StoreChangeLogger.ValueGetter<K, V> getter;
 
-    public InMemoryKeyValueLoggedStore(final String topic, final KeyValueStore<K, V> inner, final Serdes<K, V> serdes) {
-        this.topic = topic;
+    public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, final Serdes<K, V> serdes) {
+        this.storeName = storeName;
         this.inner = inner;
         this.serdes = serdes;
     }
 
     @Override
     public String name() {
-        return this.topic;
+        return this.storeName;
     }
 
     @Override
     public void init(ProcessorContext context) {
-        this.changeLogger = new StoreChangeLogger<>(topic, context, serdes);
+        this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes);
 
         inner.init(context);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
index cff9d6b..4d99b59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
@@ -39,12 +39,12 @@ public class RawStoreChangeLogger extends StoreChangeLogger<byte[], byte[]> {
         }
     }
 
-    public RawStoreChangeLogger(String topic, ProcessorContext context) {
-        this(topic, context, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
+    public RawStoreChangeLogger(String storeName, ProcessorContext context) {
+        this(storeName, context, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
     }
 
-    public RawStoreChangeLogger(String topic, ProcessorContext context, int maxDirty, int maxRemoved) {
-        super(topic, context, context.id().partition, WindowStoreUtils.INNER_SERDES, maxDirty, maxRemoved);
+    public RawStoreChangeLogger(String storeName, ProcessorContext context, int maxDirty, int maxRemoved) {
+        super(storeName, context, context.taskId().partition, WindowStoreUtils.INNER_SERDES, maxDirty, maxRemoved);
         init();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index b330334..3bbd522 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.Serdes;
 
@@ -46,17 +47,17 @@ public class StoreChangeLogger<K, V> {
     protected Set<K> dirty;
     protected Set<K> removed;
 
-    public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
-        this(topic, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
+    public StoreChangeLogger(String storeName, ProcessorContext context, Serdes<K, V> serialization) {
+        this(storeName, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
     }
 
-    public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
-        this(topic, context, context.id().partition, serialization, maxDirty, maxRemoved);
+    public StoreChangeLogger(String storeName, ProcessorContext context, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
+        this(storeName, context, context.taskId().partition, serialization, maxDirty, maxRemoved);
         init();
     }
 
-    protected StoreChangeLogger(String topic, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
-        this.topic = topic;
+    protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
+        this.topic = ProcessorStateManager.storeChangelogTopic(context.jobId(), storeName);
         this.context = context;
         this.partition = partition;
         this.serialization = serialization;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index daa7201..d8b034f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -284,7 +284,7 @@ public class KeyValueStoreTestDriver<K, V> {
         this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
                 serdes.valueDeserializer(), recordCollector) {
             @Override
-            public TaskId id() {
+            public TaskId taskId() {
                 return new TaskId(0, 1);
             }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 31b8335..d597fd2 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -101,11 +101,16 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public TaskId id() {
+    public TaskId taskId() {
         return new TaskId(0, 0);
     }
 
     @Override
+    public String jobId() {
+        return "mockJob";
+    }
+
+    @Override
     public Serializer<?> keySerializer() {
         return keySerializer;
     }