You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/04 23:51:14 UTC
kafka git commit: KAFKA-3207: Fix StateChangeLogger to use the right
topic name
Repository: kafka
Updated Branches:
refs/heads/trunk 9b47f9a7f -> 7802a90ed
KAFKA-3207: Fix StateChangeLogger to use the right topic name
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda
Closes #865 from guozhangwang/K3207
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7802a90e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7802a90e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7802a90e
Branch: refs/heads/trunk
Commit: 7802a90ed98ea5b9a2b2dcf2e04db1a50e34a2f8
Parents: 9b47f9a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Feb 4 14:51:10 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Feb 4 14:51:10 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/processor/ProcessorContext.java | 15 ++++++++++++++-
.../streams/processor/internals/AbstractTask.java | 6 ++++++
.../processor/internals/ProcessorContextImpl.java | 12 +++++++++---
.../processor/internals/StandbyContextImpl.java | 15 ++++++++++++---
.../streams/processor/internals/StandbyTask.java | 2 +-
.../state/internals/InMemoryKeyValueLoggedStore.java | 10 +++++-----
.../state/internals/RawStoreChangeLogger.java | 8 ++++----
.../streams/state/internals/StoreChangeLogger.java | 13 +++++++------
.../kafka/streams/state/KeyValueStoreTestDriver.java | 2 +-
.../org/apache/kafka/test/MockProcessorContext.java | 7 ++++++-
10 files changed, 65 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 9740fa3..79376ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -29,11 +29,18 @@ import java.io.File;
public interface ProcessorContext {
/**
+ * Returns the job id
+ *
+ * @return the job id
+ */
+ String jobId();
+
+ /**
* Returns the task id
*
* @return the task id
*/
- TaskId id();
+ TaskId taskId();
/**
* Returns the key serializer
@@ -84,6 +91,12 @@ public interface ProcessorContext {
*/
void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback);
+ /**
+ * Get the state store given the store name.
+ *
+ * @param name The store name
+ * @return The state store instance
+ */
StateStore getStateStore(String name);
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 46dd738..162a926 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -36,6 +36,7 @@ import java.util.Set;
public abstract class AbstractTask {
protected final TaskId id;
+ protected final String jobId;
protected final ProcessorTopology topology;
protected final Consumer consumer;
protected final ProcessorStateManager stateMgr;
@@ -51,6 +52,7 @@ public abstract class AbstractTask {
StreamsConfig config,
boolean isStandby) {
this.id = id;
+ this.jobId = jobId;
this.partitions = new HashSet<>(partitions);
this.topology = topology;
this.consumer = consumer;
@@ -77,6 +79,10 @@ public abstract class AbstractTask {
return id;
}
+ public final String jobId() {
+ return jobId;
+ }
+
public final Set<TopicPartition> partitions() {
return this.partitions;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 4b72394..c4acc01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -74,12 +74,18 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
this.initialized = true;
}
- public TaskId id() {
+ public ProcessorStateManager getStateMgr() {
+ return stateMgr;
+ }
+
+ @Override
+ public TaskId taskId() {
return id;
}
- public ProcessorStateManager getStateMgr() {
- return stateMgr;
+ @Override
+ public String jobId() {
+ return task.jobId();
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 133d597..82633b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -35,6 +35,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class);
private final TaskId id;
+ private final String jobId;
private final StreamsMetrics metrics;
private final ProcessorStateManager stateMgr;
@@ -46,10 +47,12 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
private boolean initialized;
public StandbyContextImpl(TaskId id,
+ String jobId,
StreamsConfig config,
ProcessorStateManager stateMgr,
StreamsMetrics metrics) {
this.id = id;
+ this.jobId = jobId;
this.metrics = metrics;
this.stateMgr = stateMgr;
@@ -65,12 +68,18 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
this.initialized = true;
}
- public TaskId id() {
+ public ProcessorStateManager getStateMgr() {
+ return stateMgr;
+ }
+
+ @Override
+ public TaskId taskId() {
return id;
}
- public ProcessorStateManager getStateMgr() {
- return stateMgr;
+ @Override
+ public String jobId() {
+ return jobId;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 861b830..7b6ab8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -64,7 +64,7 @@ public class StandbyTask extends AbstractTask {
super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true);
// initialize the topology with its own context
- this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
+ this.processorContext = new StandbyContextImpl(id, jobId, config, stateMgr, metrics);
initializeStateStores();
http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index 5be6483..94349bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -29,25 +29,25 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
private final KeyValueStore<K, V> inner;
private final Serdes<K, V> serdes;
- private final String topic;
+ private final String storeName;
private StoreChangeLogger<K, V> changeLogger;
private StoreChangeLogger.ValueGetter<K, V> getter;
- public InMemoryKeyValueLoggedStore(final String topic, final KeyValueStore<K, V> inner, final Serdes<K, V> serdes) {
- this.topic = topic;
+ public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, final Serdes<K, V> serdes) {
+ this.storeName = storeName;
this.inner = inner;
this.serdes = serdes;
}
@Override
public String name() {
- return this.topic;
+ return this.storeName;
}
@Override
public void init(ProcessorContext context) {
- this.changeLogger = new StoreChangeLogger<>(topic, context, serdes);
+ this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes);
inner.init(context);
http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
index cff9d6b..4d99b59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
@@ -39,12 +39,12 @@ public class RawStoreChangeLogger extends StoreChangeLogger<byte[], byte[]> {
}
}
- public RawStoreChangeLogger(String topic, ProcessorContext context) {
- this(topic, context, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
+ public RawStoreChangeLogger(String storeName, ProcessorContext context) {
+ this(storeName, context, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
}
- public RawStoreChangeLogger(String topic, ProcessorContext context, int maxDirty, int maxRemoved) {
- super(topic, context, context.id().partition, WindowStoreUtils.INNER_SERDES, maxDirty, maxRemoved);
+ public RawStoreChangeLogger(String storeName, ProcessorContext context, int maxDirty, int maxRemoved) {
+ super(storeName, context, context.taskId().partition, WindowStoreUtils.INNER_SERDES, maxDirty, maxRemoved);
init();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index b330334..3bbd522 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.Serdes;
@@ -46,17 +47,17 @@ public class StoreChangeLogger<K, V> {
protected Set<K> dirty;
protected Set<K> removed;
- public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
- this(topic, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
+ public StoreChangeLogger(String storeName, ProcessorContext context, Serdes<K, V> serialization) {
+ this(storeName, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
}
- public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
- this(topic, context, context.id().partition, serialization, maxDirty, maxRemoved);
+ public StoreChangeLogger(String storeName, ProcessorContext context, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
+ this(storeName, context, context.taskId().partition, serialization, maxDirty, maxRemoved);
init();
}
- protected StoreChangeLogger(String topic, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
- this.topic = topic;
+ protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
+ this.topic = ProcessorStateManager.storeChangelogTopic(context.jobId(), storeName);
this.context = context;
this.partition = partition;
this.serialization = serialization;
http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index daa7201..d8b034f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -284,7 +284,7 @@ public class KeyValueStoreTestDriver<K, V> {
this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
serdes.valueDeserializer(), recordCollector) {
@Override
- public TaskId id() {
+ public TaskId taskId() {
return new TaskId(0, 1);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 31b8335..d597fd2 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -101,11 +101,16 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
}
@Override
- public TaskId id() {
+ public TaskId taskId() {
return new TaskId(0, 0);
}
@Override
+ public String jobId() {
+ return "mockJob";
+ }
+
+ @Override
public Serializer<?> keySerializer() {
return keySerializer;
}