You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/12/08 00:12:24 UTC
[2/2] kafka git commit: KAFKA-2804: manage changelog topics through
ZK in PartitionAssignor
KAFKA-2804: manage changelog topics through ZK in PartitionAssignor
Author: Guozhang Wang <wa...@gmail.com>
Author: wangguoz@gmail.com <gu...@Guozhang-Macbook.local>
Author: Guozhang Wang <gu...@Guozhang-Macbook.local>
Reviewers: Yasuhiro Matsuda
Closes #579 from guozhangwang/K2804
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d05fa0a0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d05fa0a0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d05fa0a0
Branch: refs/heads/trunk
Commit: d05fa0a03bc9bcfcff8d73cbf1b22832ebdb75a2
Parents: 23f36c5
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Dec 7 15:12:09 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Dec 7 15:12:09 2015 -0800
----------------------------------------------------------------------
bin/kafka-run-class.sh | 2 +-
build.gradle | 2 +
checkstyle/import-control.xml | 8 +
.../apache/kafka/streams/KafkaStreaming.java | 40 +--
.../apache/kafka/streams/StreamingConfig.java | 48 +++-
.../kafka/streams/examples/KStreamJob.java | 2 +-
.../kafka/streams/examples/ProcessorJob.java | 8 +-
.../streams/kstream/SlidingWindowSupplier.java | 2 +-
.../streams/kstream/internals/KTableImpl.java | 3 +-
.../processor/DefaultPartitionGrouper.java | 7 +-
.../streams/processor/PartitionGrouper.java | 31 +--
.../streams/processor/ProcessorContext.java | 2 +-
.../streams/processor/TopologyBuilder.java | 139 +++++++---
.../processor/internals/AbstractTask.java | 3 +-
.../KafkaStreamingPartitionAssignor.java | 270 +++++++++++++++++--
.../internals/ProcessorContextImpl.java | 4 +-
.../internals/ProcessorStateManager.java | 63 +++--
.../processor/internals/StandbyContextImpl.java | 4 +-
.../processor/internals/StandbyTask.java | 8 +-
.../streams/processor/internals/StreamTask.java | 8 +-
.../processor/internals/StreamThread.java | 43 ++-
.../internals/assignment/SubscriptionInfo.java | 32 +--
.../streams/state/MeteredKeyValueStore.java | 2 +-
.../kafka/streams/state/RocksDBStore.java | 1 -
.../kafka/streams/StreamingConfigTest.java | 21 +-
.../processor/DefaultPartitionGrouperTest.java | 32 ++-
.../streams/processor/TopologyBuilderTest.java | 31 ++-
.../KafkaStreamingPartitionAssignorTest.java | 201 +++++++++++---
.../internals/ProcessorStateManagerTest.java | 152 ++++++-----
.../processor/internals/StandbyTaskTest.java | 46 ++--
.../processor/internals/StreamTaskTest.java | 4 +-
.../processor/internals/StreamThreadTest.java | 15 +-
.../assignment/SubscriptionInfoTest.java | 5 +-
.../streams/state/KeyValueStoreTestDriver.java | 2 +-
.../apache/kafka/test/MockProcessorContext.java | 2 +-
.../kafka/test/MockStateStoreSupplier.java | 2 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 8 +-
.../apache/kafka/test/UnlimitedWindowDef.java | 2 +-
38 files changed, 899 insertions(+), 356 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index fcf442b..2551338 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -57,7 +57,7 @@ do
CLASSPATH=$CLASSPATH:$file
done
-for file in $base_dir/stream/build/libs/kafka-streams*.jar;
+for file in $base_dir/streams/build/libs/kafka-streams*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e24279e..224f550 100644
--- a/build.gradle
+++ b/build.gradle
@@ -561,6 +561,8 @@ project(':streams') {
compile project(':clients')
compile "$slf4jlog4j"
compile 'org.rocksdb:rocksdbjni:3.10.1'
+ compile 'com.101tec:zkclient:0.7' // this dependency should be removed after KIP-4
+ compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version" // this dependency should be removed after KIP-4
testCompile "$junit"
testCompile project(path: ':clients', configuration: 'archives')
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e221dce..a65a2dc 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -129,6 +129,14 @@
<subpackage name="state">
<allow pkg="org.rocksdb" />
</subpackage>
+
+ <subpackage name="processor">
+ <subpackage name="internals">
+ <allow pkg="org.I0Itec.zkclient" />
+ <allow pkg="com.fasterxml.jackson" />
+ <allow pkg="org.apache.zookeeper" />
+ </subpackage>
+ </subpackage>
</subpackage>
<subpackage name="log4jappender">
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
index fc1fdae..0d99739 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
@@ -73,9 +73,7 @@ public class KafkaStreaming {
private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
- private static final String JMX_PREFIX = "kafka.streaming";
-
- private final Time time;
+ private static final String JMX_PREFIX = "kafka.streams";
// container states
private static final int CREATED = 0;
@@ -85,29 +83,39 @@ public class KafkaStreaming {
private final StreamThread[] threads;
- private String clientId;
- private final UUID uuid;
- private final Metrics metrics;
+ // processId is expected to be unique across JVMs and to be used
+ // in userData of the subscription request to allow assignor be aware
+ // of the co-location of stream thread's consumers. It is for internal
+ // usage only and should not be exposed to users at all.
+ private final UUID processId;
public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
// create the metrics
- this.time = new SystemTime();
- this.uuid = UUID.randomUUID();
+ Time time = new SystemTime();
- MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
- .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
- TimeUnit.MILLISECONDS);
- clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
+ this.processId = UUID.randomUUID();
+
+ String jobId = config.getString(StreamingConfig.JOB_ID_CONFIG);
+ if (jobId.length() <= 0)
+ jobId = "kafka-streams";
+
+ String clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
- clientId = "streaming-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
+ clientId = jobId + "-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
+
List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
+ MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
- this.metrics = new Metrics(metricConfig, reporters, time);
+
+ MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
+ .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+ TimeUnit.MILLISECONDS);
+
+ Metrics metrics = new Metrics(metricConfig, reporters, time);
this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
for (int i = 0; i < this.threads.length; i++) {
- this.threads[i] = new StreamThread(builder, config, this.clientId, this.uuid, this.metrics, this.time);
+ this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index 437afd8..e89d030 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -42,6 +42,10 @@ public class StreamingConfig extends AbstractConfig {
public static final String STATE_DIR_CONFIG = "state.dir";
private static final String STATE_DIR_DOC = "Directory location for state store.";
+ /** <code>zookeeper.connect<code/> */
+ public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
+ private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
+
/** <code>commit.interval.ms</code> */
public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
@@ -83,8 +87,9 @@ public class StreamingConfig extends AbstractConfig {
public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
- /** <code>client.id</code> */
- public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+ /** <code>job.id</code> */
+ public static final String JOB_ID_CONFIG = "job.id";
+ public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
/** <code>key.serializer</code> */
public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@@ -107,19 +112,30 @@ public class StreamingConfig extends AbstractConfig {
/** <code>metric.reporters</code> */
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
- /**
- * <code>bootstrap.servers</code>
- */
+ /** <code>bootstrap.servers</code> */
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+ /** <code>client.id</code> */
+ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+
private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
static {
- CONFIG = new ConfigDef().define(CLIENT_ID_CONFIG,
+ CONFIG = new ConfigDef().define(JOB_ID_CONFIG,
+ Type.STRING,
+ "",
+ Importance.MEDIUM,
+ StreamingConfig.JOB_ID_DOC)
+ .define(CLIENT_ID_CONFIG,
Type.STRING,
"",
Importance.MEDIUM,
CommonClientConfigs.CLIENT_ID_DOC)
+ .define(ZOOKEEPER_CONNECT_CONFIG,
+ Type.STRING,
+ "",
+ Importance.HIGH,
+ StreamingConfig.ZOOKEEPER_CONNECT_DOC)
.define(STATE_DIR_CONFIG,
Type.STRING,
SYSTEM_TEMP_DIRECTORY,
@@ -221,20 +237,27 @@ public class StreamingConfig extends AbstractConfig {
super(CONFIG, props);
}
- public Map<String, Object> getConsumerConfigs(StreamThread streamThread) {
+ public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
Map<String, Object> props = getBaseConsumerConfigs();
+
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
- props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
+
+ props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
+
return props;
}
- public Map<String, Object> getRestoreConsumerConfigs() {
+ public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
Map<String, Object> props = getBaseConsumerConfigs();
- // no group id for a restore consumer
+ // no need to set group id for a restore consumer
props.remove(ConsumerConfig.GROUP_ID_CONFIG);
+ props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
+
return props;
}
@@ -248,11 +271,12 @@ public class StreamingConfig extends AbstractConfig {
props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG);
props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG);
props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+ props.remove(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
return props;
}
- public Map<String, Object> getProducerConfigs() {
+ public Map<String, Object> getProducerConfigs(String clientId) {
Map<String, Object> props = this.originals();
// set producer default property values
@@ -263,6 +287,8 @@ public class StreamingConfig extends AbstractConfig {
props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+ props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
+
return props;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
index 87368c1..819bd68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
@@ -34,7 +34,7 @@ public class KStreamJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
- props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-KStream-Job");
+ props.put(StreamingConfig.JOB_ID_CONFIG, "example-kstream");
props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 882c7ed..2d0b79f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -49,7 +49,7 @@ public class ProcessorJob {
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
- this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("local-state");
+ this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("LOCAL-STATE");
}
@Override
@@ -90,8 +90,9 @@ public class ProcessorJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
- props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
+ props.put(StreamingConfig.JOB_ID_CONFIG, "example-processor");
props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
@@ -104,8 +105,7 @@ public class ProcessorJob {
builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
- builder.addStateStore(Stores.create("local-state").withStringKeys().withIntegerValues().inMemory().build());
- builder.connectProcessorAndStateStores("local-state", "PROCESS");
+ builder.addStateStore(Stores.create("LOCAL-STATE").withStringKeys().withIntegerValues().inMemory().build(), "PROCESS");
builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
index 0cf969f..80e548f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
@@ -85,7 +85,7 @@ public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
this.context = context;
this.partition = context.id().partition;
SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
- context.register(this, restoreFunc);
+ context.register(this, true, restoreFunc);
for (ValueList<V> valueList : map.values()) {
valueList.clearDirtyValues();
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 5b2b031..47c9b09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -177,7 +177,8 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
if (!source.isMaterialized()) {
StateStoreSupplier storeSupplier =
new KTableStoreSupplier(topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
- topology.addStateStore(storeSupplier, name);
+ // mark this state is non internal hence it is read directly from a user topic
+ topology.addStateStore(storeSupplier, false, name);
source.materialize();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 7d2188a..923a217 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -29,9 +29,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-public class DefaultPartitionGrouper extends PartitionGrouper {
+public class DefaultPartitionGrouper implements PartitionGrouper {
- public Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata) {
+ public Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata) {
Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();
for (Map.Entry<Integer, Set<String>> entry : topicGroups.entrySet()) {
@@ -71,3 +71,6 @@ public class DefaultPartitionGrouper extends PartitionGrouper {
}
}
+
+
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index 187c4ce..a40a1c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -19,39 +19,18 @@ package org.apache.kafka.streams.processor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
import java.util.Map;
import java.util.Set;
-public abstract class PartitionGrouper {
-
- protected Map<Integer, Set<String>> topicGroups;
-
- private KafkaStreamingPartitionAssignor partitionAssignor = null;
+public interface PartitionGrouper {
/**
* Returns a map of task ids to groups of partitions.
*
- * @param metadata
+ * @param topicGroups The subscribed topic groups
+ * @param metadata Metadata of the consuming cluster
* @return a map of task ids to groups of partitions
*/
- public abstract Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata);
-
- public void topicGroups(Map<Integer, Set<String>> topicGroups) {
- this.topicGroups = topicGroups;
- }
-
- public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) {
- this.partitionAssignor = partitionAssignor;
- }
-
- public Set<TaskId> taskIds(TopicPartition partition) {
- return partitionAssignor.taskIds(partition);
- }
-
- public Map<TaskId, Set<TopicPartition>> standbyTasks() {
- return partitionAssignor.standbyTasks();
- }
-
-}
+ Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 88ac64e..fa19ed7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -79,7 +79,7 @@ public interface ProcessorContext {
*
* @param store the storage engine
*/
- void register(StateStore store, StateRestoreCallback stateRestoreCallback);
+ void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback);
StateStore getStateStore(String name);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 021a47f..3cfb22b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -45,14 +45,17 @@ import java.util.Set;
* its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
* processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
* is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
- * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link KafkaStreaming} instance
- * that will then {@link KafkaStreaming#start() begin consuming, processing, and producing messages}.
+ * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreaming}
+ * instance that will then {@link org.apache.kafka.streams.KafkaStreaming#start() begin consuming, processing, and producing messages}.
*/
public class TopologyBuilder {
// node factories in a topological order
private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
+ // state factories
+ private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
+
private final Set<String> sourceTopicNames = new HashSet<>();
private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
@@ -60,8 +63,18 @@ public class TopologyBuilder {
private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
private Map<Integer, Set<String>> nodeGroups = null;
- private Map<String, StateStoreSupplier> stateStores = new HashMap<>();
- private Map<String, Set<String>> stateStoreUsers = new HashMap();
+ private static class StateStoreFactory {
+ public final Set<String> users;
+
+ public final boolean isInternal;
+ public final StateStoreSupplier supplier;
+
+ StateStoreFactory(boolean isInternal, StateStoreSupplier supplier) {
+ this.isInternal = isInternal;
+ this.supplier = supplier;
+ this.users = new HashSet<>();
+ }
+ }
private static abstract class NodeFactory {
public final String name;
@@ -88,6 +101,7 @@ public class TopologyBuilder {
stateStoreNames.add(stateStoreName);
}
+ @SuppressWarnings("unchecked")
@Override
public ProcessorNode build() {
return new ProcessorNode(name, supplier.get(), stateStoreNames);
@@ -106,6 +120,7 @@ public class TopologyBuilder {
this.valDeserializer = valDeserializer;
}
+ @SuppressWarnings("unchecked")
@Override
public ProcessorNode build() {
return new SourceNode(name, keyDeserializer, valDeserializer);
@@ -125,12 +140,40 @@ public class TopologyBuilder {
this.keySerializer = keySerializer;
this.valSerializer = valSerializer;
}
+
+ @SuppressWarnings("unchecked")
@Override
public ProcessorNode build() {
return new SinkNode(name, topic, keySerializer, valSerializer);
}
}
+ public static class TopicsInfo {
+ public Set<String> sourceTopics;
+ public Set<String> stateNames;
+
+ public TopicsInfo(Set<String> sourceTopics, Set<String> stateNames) {
+ this.sourceTopics = sourceTopics;
+ this.stateNames = stateNames;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof TopicsInfo) {
+ TopicsInfo other = (TopicsInfo) o;
+ return other.sourceTopics.equals(this.sourceTopics) && other.stateNames.equals(this.stateNames);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ long n = ((long) sourceTopics.hashCode() << 32) | (long) stateNames.hashCode();
+ return (int) (n % 0xFFFFFFFFL);
+ }
+ }
+
/**
* Create a new builder.
*/
@@ -138,9 +181,9 @@ public class TopologyBuilder {
/**
* Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
- * The source will use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
- * {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
- * {@link StreamingConfig streaming configuration}.
+ * The source will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
+ * {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
*
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
@@ -158,11 +201,11 @@ public class TopologyBuilder {
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
* @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
- * should use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
- * {@link StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
* @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
- * should use the {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
- * {@link StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
@@ -186,9 +229,9 @@ public class TopologyBuilder {
/**
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
- * The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
- * {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
- * {@link StreamingConfig streaming configuration}.
+ * The sink will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
+ * {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
*
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
@@ -205,11 +248,11 @@ public class TopologyBuilder {
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
- * should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
- * {@link StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
* @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
- * should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
- * {@link StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
@@ -271,12 +314,12 @@ public class TopologyBuilder {
* @param supplier the supplier used to obtain this state store {@link StateStore} instance
* @return this builder instance so methods can be chained together; never null
*/
- public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
- if (stateStores.containsKey(supplier.name())) {
+ public final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) {
+ if (stateFactories.containsKey(supplier.name())) {
throw new TopologyException("StateStore " + supplier.name() + " is already added.");
}
- stateStores.put(supplier.name(), supplier);
- stateStoreUsers.put(supplier.name(), new HashSet<String>());
+
+ stateFactories.put(supplier.name(), new StateStoreFactory(isInternal, supplier));
if (processorNames != null) {
for (String processorName : processorNames) {
@@ -288,6 +331,16 @@ public class TopologyBuilder {
}
/**
+ * Adds a state store
+ *
+ * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+ * @return this builder instance so methods can be chained together; never null
+ */
+ public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
+ return this.addStateStore(supplier, true, processorNames);
+ }
+
+ /**
* Connects the processor and the state stores
*
* @param processorName the name of the processor
@@ -305,22 +358,22 @@ public class TopologyBuilder {
}
private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
- if (!stateStores.containsKey(stateStoreName))
+ if (!stateFactories.containsKey(stateStoreName))
throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
if (!nodeFactories.containsKey(processorName))
throw new TopologyException("Processor " + processorName + " is not added yet.");
- Set<String> users = stateStoreUsers.get(stateStoreName);
- Iterator<String> iter = users.iterator();
+ StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
+ Iterator<String> iter = stateStoreFactory.users.iterator();
if (iter.hasNext()) {
String user = iter.next();
nodeGrouper.unite(user, processorName);
}
- users.add(processorName);
+ stateStoreFactory.users.add(processorName);
- NodeFactory factory = nodeFactories.get(processorName);
- if (factory instanceof ProcessorNodeFactory) {
- ((ProcessorNodeFactory) factory).addStateStore(stateStoreName);
+ NodeFactory nodeFactory = nodeFactories.get(processorName);
+ if (nodeFactory instanceof ProcessorNodeFactory) {
+ ((ProcessorNodeFactory) nodeFactory).addStateStore(stateStoreName);
} else {
throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
}
@@ -332,20 +385,32 @@ public class TopologyBuilder {
*
* @return groups of topic names
*/
- public Map<Integer, Set<String>> topicGroups() {
- Map<Integer, Set<String>> topicGroups = new HashMap<>();
+ public Map<Integer, TopicsInfo> topicGroups() {
+ Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
if (nodeGroups == null)
nodeGroups = makeNodeGroups();
for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
- Set<String> topicGroup = new HashSet<>();
+ Set<String> sourceTopics = new HashSet<>();
+ Set<String> stateNames = new HashSet<>();
for (String node : entry.getValue()) {
+ // if the node is a source node, add to the source topics
String[] topics = nodeToTopics.get(node);
if (topics != null)
- topicGroup.addAll(Arrays.asList(topics));
+ sourceTopics.addAll(Arrays.asList(topics));
+
+ // if the node is connected to a state, add to the state topics
+ for (StateStoreFactory stateFactory : stateFactories.values()) {
+
+ if (stateFactory.isInternal && stateFactory.users.contains(node)) {
+ stateNames.add(stateFactory.supplier.name());
+ }
+ }
}
- topicGroups.put(entry.getKey(), Collections.unmodifiableSet(topicGroup));
+ topicGroups.put(entry.getKey(), new TopicsInfo(
+ Collections.unmodifiableSet(sourceTopics),
+ Collections.unmodifiableSet(stateNames)));
}
return Collections.unmodifiableMap(topicGroups);
@@ -431,9 +496,9 @@ public class TopologyBuilder {
/**
* Build the topology for the specified topic group. This is called automatically when passing this builder into the
- * {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor.
+ * {@link org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)} constructor.
*
- * @see KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)
+ * @see org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)
*/
public ProcessorTopology build(Integer topicGroupId) {
Set<String> nodeGroup;
@@ -467,7 +532,7 @@ public class TopologyBuilder {
}
for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
if (!stateStoreMap.containsKey(stateStoreName)) {
- stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
+ stateStoreMap.put(stateStoreName, stateFactories.get(stateStoreName).supplier);
}
}
} else if (factory instanceof SourceNodeFactory) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index e1b4d62..b3255bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -43,6 +43,7 @@ public abstract class AbstractTask {
protected ProcessorContext processorContext;
protected AbstractTask(TaskId id,
+ String jobId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
@@ -58,7 +59,7 @@ public abstract class AbstractTask {
try {
File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
// if partitions is null, this is a standby task
- this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, isStandby);
+ this.stateMgr = new ProcessorStateManager(jobId, id.partition, stateFile, restoreConsumer, isStandby);
} catch (IOException e) {
throw new KafkaException("Error while creating the state manager", e);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index 54d5567..29c67f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -21,9 +21,11 @@ import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
@@ -32,7 +34,21 @@ import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.ZooDefs;
+
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.I0Itec.zkclient.ZkClient;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -46,10 +62,146 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
private StreamThread streamThread;
+
private int numStandbyReplicas;
+ private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
+ private Map<String, Set<TaskId>> stateNameToTaskIds;
private Map<TaskId, Set<TopicPartition>> standbyTasks;
+
+ // TODO: the following ZK dependency should be removed after KIP-4
+ private static final String ZK_TOPIC_PATH = "/brokers/topics";
+ private static final String ZK_BROKER_PATH = "/brokers/ids";
+ private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
+
+ private ZkClient zkClient;
+
+ private class ZKStringSerializer implements ZkSerializer {
+
+ @Override
+ public byte[] serialize(Object data) {
+ try {
+ return ((String) data).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) {
+ try {
+ if (bytes == null)
+ return null;
+ else
+ return new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new AssertionError(e);
+ }
+ }
+ }
+
+ private List<Integer> getBrokers() {
+ List<Integer> brokers = new ArrayList<>();
+ for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
+ brokers.add(Integer.parseInt(broker));
+ }
+ Collections.sort(brokers);
+
+ log.debug("Read brokers {} from ZK in partition assignor.", brokers);
+
+ return brokers;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
+ String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
+
+ if (data == null) return null;
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+
+ Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
+
+ });
+
+ Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
+
+ log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
+
+ return partitions;
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
+ log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
+
+ // we always assign leaders to brokers starting at the first one with replication factor 1
+ List<Integer> brokers = getBrokers();
+
+ Map<Integer, List<Integer>> assignment = new HashMap<>();
+ for (int i = 0; i < numPartitions; i++) {
+ assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
+ }
+
+ // try to write to ZK with open ACL
+ try {
+ Map<String, Object> dataMap = new HashMap<>();
+ dataMap.put("version", 1);
+ dataMap.put("partitions", assignment);
+
+ ObjectMapper mapper = new ObjectMapper();
+ String data = mapper.writeValueAsString(dataMap);
+
+ zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ } catch (JsonProcessingException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ private void deleteTopic(String topic) throws ZkNodeExistsException {
+ log.debug("Deleting topic {} from ZK in partition assignor.", topic);
+
+ zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
+ log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
+
+ // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
+ List<Integer> brokers = getBrokers();
+
+ int startIndex = existingAssignment.size();
+
+ Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
+
+ for (int i = 0; i < numPartitions; i++) {
+ newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
+ }
+
+ // try to write to ZK with open ACL
+ try {
+ Map<String, Object> dataMap = new HashMap<>();
+ dataMap.put("version", 1);
+ dataMap.put("partitions", newAssignment);
+
+ ObjectMapper mapper = new ObjectMapper();
+ String data = mapper.writeValueAsString(dataMap);
+
+ zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
+ } catch (JsonProcessingException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ /**
+ * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
+ * since the former needs later's cached metadata while sending subscriptions,
+ * and the latter needs former's returned assignment when adding tasks.
+ */
@Override
public void configure(Map<String, ?> configs) {
numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
@@ -68,7 +220,12 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
}
streamThread = (StreamThread) o;
- streamThread.partitionGrouper.partitionAssignor(this);
+ streamThread.partitionAssignor(this);
+
+ this.topicGroups = streamThread.builder.topicGroups();
+
+ if (configs.containsKey(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG))
+ zkClient = new ZkClient((String) configs.get(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
}
@Override
@@ -86,7 +243,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
Set<TaskId> prevTasks = streamThread.prevTasks();
Set<TaskId> standbyTasks = streamThread.cachedTasks();
standbyTasks.removeAll(prevTasks);
- SubscriptionInfo data = new SubscriptionInfo(streamThread.clientUUID, prevTasks, standbyTasks);
+ SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
return new Subscription(new ArrayList<>(topics), data.encode());
}
@@ -112,17 +269,17 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
- Set<String> consumers = consumersByClient.get(info.clientUUID);
+ Set<String> consumers = consumersByClient.get(info.processId);
if (consumers == null) {
consumers = new HashSet<>();
- consumersByClient.put(info.clientUUID, consumers);
+ consumersByClient.put(info.processId, consumers);
}
consumers.add(consumerId);
- ClientState<TaskId> state = states.get(info.clientUUID);
+ ClientState<TaskId> state = states.get(info.processId);
if (state == null) {
state = new ClientState<>();
- states.put(info.clientUUID, state);
+ states.put(info.processId, state);
}
state.prevActiveTasks.addAll(info.prevTasks);
@@ -131,21 +288,40 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
state.capacity = state.capacity + 1d;
}
- // Get partition groups from the partition grouper
- Map<TaskId, Set<TopicPartition>> partitionGroups = streamThread.partitionGrouper.partitionGroups(metadata);
+ // get the tasks as partition groups from the partition grouper
+ Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
+ for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
+ }
+ Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
+
+ // add tasks to state topic subscribers
+ stateNameToTaskIds = new HashMap<>();
+ for (TaskId task : partitionsForTask.keySet()) {
+ for (String stateName : topicGroups.get(task.topicGroupId).stateNames) {
+ Set<TaskId> tasks = stateNameToTaskIds.get(stateName);
+ if (tasks == null) {
+ tasks = new HashSet<>();
+ stateNameToTaskIds.put(stateName, tasks);
+ }
+
+ tasks.add(task);
+ }
+ }
- states = TaskAssignor.assign(states, partitionGroups.keySet(), numStandbyReplicas);
+ // assign tasks to clients
+ states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
Map<String, Assignment> assignment = new HashMap<>();
for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
- UUID uuid = entry.getKey();
+ UUID processId = entry.getKey();
Set<String> consumers = entry.getValue();
- ClientState<TaskId> state = states.get(uuid);
+ ClientState<TaskId> state = states.get(processId);
ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
final int numActiveTasks = state.activeTasks.size();
- for (TaskId id : state.activeTasks) {
- taskIds.add(id);
+ for (TaskId taskId : state.activeTasks) {
+ taskIds.add(taskId);
}
for (TaskId id : state.assignedTasks) {
if (!state.activeTasks.contains(id))
@@ -164,7 +340,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
for (int j = i; j < numTaskIds; j += numConsumers) {
TaskId taskId = taskIds.get(j);
if (j < numActiveTasks) {
- for (TopicPartition partition : partitionGroups.get(taskId)) {
+ for (TopicPartition partition : partitionsForTask.get(taskId)) {
activePartitions.add(partition);
active.add(taskId);
}
@@ -174,7 +350,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
standbyPartitions = new HashSet<>();
standby.put(taskId, standbyPartitions);
}
- standbyPartitions.addAll(partitionGroups.get(taskId));
+ standbyPartitions.addAll(partitionsForTask.get(taskId));
}
}
@@ -187,6 +363,63 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
}
}
+ // if ZK is specified, get the tasks for each state topic and validate the topic partitions
+ if (zkClient != null) {
+ log.debug("Starting to validate changelog topics in partition assignor.");
+
+ for (Map.Entry<String, Set<TaskId>> entry : stateNameToTaskIds.entrySet()) {
+ String topic = streamThread.jobId + "-" + entry.getKey() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
+
+ // the expected number of partitions is the max value of TaskId.partition + 1
+ int numPartitions = 0;
+ for (TaskId task : entry.getValue()) {
+ if (numPartitions < task.partition + 1)
+ numPartitions = task.partition + 1;
+ }
+
+ boolean topicNotReady = true;
+
+ while (topicNotReady) {
+ Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
+
+ // if topic does not exist, create it
+ if (topicMetadata == null) {
+ try {
+ createTopic(topic, numPartitions);
+ } catch (ZkNodeExistsException e) {
+ // ignore and continue
+ }
+ } else {
+ if (topicMetadata.size() > numPartitions) {
+ // else if topic exists with more #.partitions than needed, delete in order to re-create it
+ try {
+ deleteTopic(topic);
+ } catch (ZkNodeExistsException e) {
+ // ignore and continue
+ }
+ } else if (topicMetadata.size() < numPartitions) {
+ // else if topic exists with less #.partitions than needed, add partitions
+ try {
+ addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
+ } catch (ZkNoNodeException e) {
+ // ignore and continue
+ }
+ }
+
+ topicNotReady = false;
+ }
+ }
+
+ // wait until the topic metadata has been propagated to all brokers
+ List<PartitionInfo> partitions;
+ do {
+ partitions = streamThread.restoreConsumer.partitionsFor(topic);
+ } while (partitions == null || partitions.size() != numPartitions);
+ }
+
+ log.info("Completed validating changelog topics in partition assignor.");
+ }
+
return assignment;
}
@@ -220,7 +453,12 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
this.partitionToTaskIds = partitionToTaskIds;
}
- public Set<TaskId> taskIds(TopicPartition partition) {
+ /* For Test Only */
+ public Set<TaskId> tasksForState(String stateName) {
+ return stateNameToTaskIds.get(stateName);
+ }
+
+ public Set<TaskId> tasksForPartition(TopicPartition partition) {
return partitionToTaskIds.get(partition);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 1321cc5..3429df3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -118,11 +118,11 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
}
@Override
- public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
+ public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
if (initialized)
throw new KafkaException("Can only create state stores during initialization.");
- stateMgr.register(store, stateRestoreCallback);
+ stateMgr.register(store, loggingEnabled, stateRestoreCallback);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 4cff02d..579d245 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -19,10 +19,11 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.OffsetCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,9 +44,11 @@ public class ProcessorStateManager {
private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
+ public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
public static final String LOCK_FILE_NAME = ".lock";
+ private final String jobId;
private final int partition;
private final File baseDir;
private final FileLock directoryLock;
@@ -55,9 +58,10 @@ public class ProcessorStateManager {
private final Map<TopicPartition, Long> checkpointedOffsets;
private final Map<TopicPartition, Long> offsetLimits;
private final boolean isStandby;
- private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks
+ private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
- public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
+ public ProcessorStateManager(String jobId, int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
+ this.jobId = jobId;
this.partition = partition;
this.baseDir = baseDir;
this.stores = new HashMap<>();
@@ -90,6 +94,10 @@ public class ProcessorStateManager {
}
}
+ public static String storeChangelogTopic(String jobId, String storeName) {
+ return jobId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
+ }
+
public static FileLock lockStateDirectory(File stateDir) throws IOException {
File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
@@ -104,7 +112,7 @@ public class ProcessorStateManager {
return this.baseDir;
}
- public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
+ public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
if (store.name().equals(CHECKPOINT_FILE_NAME))
throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME);
@@ -112,44 +120,52 @@ public class ProcessorStateManager {
throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");
// check that the underlying change log topic exist or not
- if (restoreConsumer.listTopics().containsKey(store.name())) {
- boolean partitionNotFound = true;
- for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(store.name())) {
+ String topic;
+ if (loggingEnabled)
+ topic = storeChangelogTopic(this.jobId, store.name());
+ else topic = store.name();
+
+ // block until the partition is ready for this state changelog topic or time has elapsed
+ boolean partitionNotFound = true;
+ long startTime = System.currentTimeMillis();
+ long waitTime = 5000L; // hard-code the value since we should not block after KIP-4
+
+ do {
+ try {
+ Thread.sleep(50L);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(topic)) {
if (partitionInfo.partition() == partition) {
partitionNotFound = false;
break;
}
}
+ } while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime);
- if (partitionNotFound)
- throw new IllegalStateException("Store " + store.name() + "'s change log does not contain the partition " + partition);
-
- } else {
- throw new IllegalStateException("Change log topic for store " + store.name() + " does not exist yet");
- }
+ if (partitionNotFound)
+ throw new KafkaException("Store " + store.name() + "'s change log does not contain partition " + partition);
this.stores.put(store.name(), store);
if (isStandby) {
if (store.persistent())
- restoreCallbacks.put(store.name(), stateRestoreCallback);
+ restoreCallbacks.put(topic, stateRestoreCallback);
} else {
restoreActiveState(store, stateRestoreCallback);
}
}
private void restoreActiveState(StateStore store, StateRestoreCallback stateRestoreCallback) {
-
- if (store == null)
- throw new IllegalArgumentException("Store " + store.name() + " has not been registered.");
-
// ---- try to restore the state from change-log ---- //
// subscribe to the store's partition
if (!restoreConsumer.subscription().isEmpty()) {
throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
}
- TopicPartition storePartition = new TopicPartition(store.name(), partition);
+ TopicPartition storePartition = new TopicPartition(storeChangelogTopic(this.jobId, store.name()), partition);
restoreConsumer.assign(Collections.singletonList(storePartition));
try {
@@ -195,8 +211,8 @@ public class ProcessorStateManager {
Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
- String storeName = entry.getKey();
- TopicPartition storePartition = new TopicPartition(storeName, partition);
+ String topicName = entry.getKey();
+ TopicPartition storePartition = new TopicPartition(topicName, partition);
if (checkpointedOffsets.containsKey(storePartition)) {
partitionsAndOffsets.put(storePartition, checkpointedOffsets.get(storePartition));
@@ -212,6 +228,7 @@ public class ProcessorStateManager {
List<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
// restore states from changelog records
+
StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());
long lastOffset = -1L;
@@ -276,7 +293,7 @@ public class ProcessorStateManager {
Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
for (String storeName : stores.keySet()) {
- TopicPartition part = new TopicPartition(storeName, partition);
+ TopicPartition part = new TopicPartition(storeChangelogTopic(jobId, storeName), partition);
// only checkpoint the offset to the offsets file if it is persistent;
if (stores.get(storeName).persistent()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index ea95300..e0583e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -110,11 +110,11 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
}
@Override
- public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
+ public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
if (initialized)
throw new KafkaException("Can only create state stores during initialization.");
- stateMgr.register(store, stateRestoreCallback);
+ stateMgr.register(store, loggingEnabled, stateRestoreCallback);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index d0d8493..4cc4ea4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -45,19 +45,23 @@ public class StandbyTask extends AbstractTask {
* Create {@link StandbyTask} with its assigned partitions
*
* @param id the ID of this task
- * @param restoreConsumer the instance of {@link Consumer} used when restoring state
+ * @param jobId the ID of the job
+ * @param partitions the collection of assigned {@link TopicPartition}
* @param topology the instance of {@link ProcessorTopology}
+ * @param consumer the instance of {@link Consumer}
+ * @param restoreConsumer the instance of {@link Consumer} used when restoring state
* @param config the {@link StreamingConfig} specified by the user
* @param metrics the {@link StreamingMetrics} created by the thread
*/
public StandbyTask(TaskId id,
+ String jobId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
StreamingConfig config,
StreamingMetrics metrics) {
- super(id, partitions, topology, consumer, restoreConsumer, config, true);
+ super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true);
// initialize the topology with its own context
this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 24c450e..2e58ad5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -61,15 +61,17 @@ public class StreamTask extends AbstractTask implements Punctuator {
* Create {@link StreamTask} with its assigned partitions
*
* @param id the ID of this task
+ * @param jobId the ID of the job
+ * @param partitions the collection of assigned {@link TopicPartition}
+ * @param topology the instance of {@link ProcessorTopology}
* @param consumer the instance of {@link Consumer}
* @param producer the instance of {@link Producer}
* @param restoreConsumer the instance of {@link Consumer} used when restoring state
- * @param partitions the collection of assigned {@link TopicPartition}
- * @param topology the instance of {@link ProcessorTopology}
* @param config the {@link StreamingConfig} specified by the user
* @param metrics the {@link StreamingMetrics} created by the thread
*/
public StreamTask(TaskId id,
+ String jobId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
@@ -77,7 +79,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
Consumer<byte[], byte[]> restoreConsumer,
StreamingConfig config,
StreamingMetrics metrics) {
- super(id, partitions, topology, consumer, restoreConsumer, config, false);
+ super(id, jobId, partitions, topology, consumer, restoreConsumer, config, false);
this.punctuationQueue = new PunctuationQueue();
this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c77a027..4d1ef43 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -70,7 +70,9 @@ public class StreamThread extends Thread {
private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1);
public final PartitionGrouper partitionGrouper;
- public final UUID clientUUID;
+ public final String jobId;
+ public final String clientId;
+ public final UUID processId;
protected final StreamingConfig config;
protected final TopologyBuilder builder;
@@ -83,7 +85,6 @@ public class StreamThread extends Thread {
private final Map<TaskId, StreamTask> activeTasks;
private final Map<TaskId, StandbyTask> standbyTasks;
private final Set<TaskId> prevTasks;
- private final String clientId;
private final Time time;
private final File stateDir;
private final long pollTimeMs;
@@ -92,6 +93,8 @@ public class StreamThread extends Thread {
private final long totalRecordsToProcess;
private final StreamingMetricsImpl sensors;
+ private KafkaStreamingPartitionAssignor partitionAssignor = null;
+
private long lastClean;
private long lastCommit;
private long recordsProcessed;
@@ -118,11 +121,12 @@ public class StreamThread extends Thread {
public StreamThread(TopologyBuilder builder,
StreamingConfig config,
+ String jobId,
String clientId,
- UUID clientUUID,
+ UUID processId,
Metrics metrics,
Time time) throws Exception {
- this(builder, config, null , null, null, clientId, clientUUID, metrics, time);
+ this(builder, config, null , null, null, jobId, clientId, processId, metrics, time);
}
StreamThread(TopologyBuilder builder,
@@ -130,19 +134,20 @@ public class StreamThread extends Thread {
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
+ String jobId,
String clientId,
- UUID clientUUID,
+ UUID processId,
Metrics metrics,
Time time) throws Exception {
super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement());
+ this.jobId = jobId;
this.config = config;
this.builder = builder;
this.sourceTopics = builder.sourceTopics();
this.clientId = clientId;
- this.clientUUID = clientUUID;
+ this.processId = processId;
this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
- this.partitionGrouper.topicGroups(builder.topicGroups());
// set the producer and consumer clients
this.producer = (producer != null) ? producer : createProducer();
@@ -175,23 +180,27 @@ public class StreamThread extends Thread {
this.running = new AtomicBoolean(true);
}
+ public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) {
+ this.partitionAssignor = partitionAssignor;
+ }
+
private Producer<byte[], byte[]> createProducer() {
log.info("Creating producer client for stream thread [" + this.getName() + "]");
- return new KafkaProducer<>(config.getProducerConfigs(),
+ return new KafkaProducer<>(config.getProducerConfigs(this.clientId),
new ByteArraySerializer(),
new ByteArraySerializer());
}
private Consumer<byte[], byte[]> createConsumer() {
log.info("Creating consumer client for stream thread [" + this.getName() + "]");
- return new KafkaConsumer<>(config.getConsumerConfigs(this),
+ return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId),
new ByteArrayDeserializer(),
new ByteArrayDeserializer());
}
private Consumer<byte[], byte[]> createRestoreConsumer() {
log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
- return new KafkaConsumer<>(config.getRestoreConsumerConfigs(),
+ return new KafkaConsumer<>(config.getRestoreConsumerConfigs(this.clientId),
new ByteArrayDeserializer(),
new ByteArrayDeserializer());
}
@@ -516,14 +525,17 @@ public class StreamThread extends Thread {
ProcessorTopology topology = builder.build(id.topicGroupId);
- return new StreamTask(id, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
+ return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
}
private void addStreamTasks(Collection<TopicPartition> assignment) {
+ if (partitionAssignor == null)
+ throw new KafkaException("Partition assignor has not been initialized while adding stream tasks: this should not happen.");
+
HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
for (TopicPartition partition : assignment) {
- Set<TaskId> taskIds = partitionGrouper.taskIds(partition);
+ Set<TaskId> taskIds = partitionAssignor.tasksForPartition(partition);
for (TaskId taskId : taskIds) {
Set<TopicPartition> partitions = partitionsForTask.get(taskId);
if (partitions == null) {
@@ -574,17 +586,20 @@ public class StreamThread extends Thread {
ProcessorTopology topology = builder.build(id.topicGroupId);
if (!topology.stateStoreSuppliers().isEmpty()) {
- return new StandbyTask(id, partitions, topology, consumer, restoreConsumer, config, sensors);
+ return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer, config, sensors);
} else {
return null;
}
}
private void addStandbyTasks() {
+ if (partitionAssignor == null)
+ throw new KafkaException("Partition assignor has not been initialized while adding standby tasks: this should not happen.");
+
Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
// create the standby tasks
- for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionGrouper.standbyTasks().entrySet()) {
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.standbyTasks().entrySet()) {
TaskId taskId = entry.getKey();
Set<TopicPartition> partitions = entry.getValue();
StandbyTask task = createStandbyTask(taskId, partitions);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 54042b9..43009a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -30,30 +30,32 @@ public class SubscriptionInfo {
private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
+ private static final int CURRENT_VERSION = 1;
+
public final int version;
- public final UUID clientUUID;
+ public final UUID processId;
public final Set<TaskId> prevTasks;
public final Set<TaskId> standbyTasks;
- public SubscriptionInfo(UUID clientUUID, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
- this(1, clientUUID, prevTasks, standbyTasks);
+ public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
+ this(CURRENT_VERSION, processId, prevTasks, standbyTasks);
}
- private SubscriptionInfo(int version, UUID clientUUID, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
+ private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
this.version = version;
- this.clientUUID = clientUUID;
+ this.processId = processId;
this.prevTasks = prevTasks;
this.standbyTasks = standbyTasks;
}
public ByteBuffer encode() {
- if (version == 1) {
- ByteBuffer buf = ByteBuffer.allocate(4 + 16 + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
+ if (version == CURRENT_VERSION) {
+ ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
// version
- buf.putInt(1);
+ buf.putInt(version);
// encode client UUID
- buf.putLong(clientUUID.getMostSignificantBits());
- buf.putLong(clientUUID.getLeastSignificantBits());
+ buf.putLong(processId.getMostSignificantBits());
+ buf.putLong(processId.getLeastSignificantBits());
// encode ids of previously running tasks
buf.putInt(prevTasks.size());
for (TaskId id : prevTasks) {
@@ -81,9 +83,9 @@ public class SubscriptionInfo {
// Decode version
int version = data.getInt();
- if (version == 1) {
+ if (version == CURRENT_VERSION) {
// Decode client UUID
- UUID clientUUID = new UUID(data.getLong(), data.getLong());
+ UUID processId = new UUID(data.getLong(), data.getLong());
// Decode previously active tasks
Set<TaskId> prevTasks = new HashSet<>();
int numPrevs = data.getInt();
@@ -98,7 +100,7 @@ public class SubscriptionInfo {
standbyTasks.add(TaskId.readFrom(data));
}
- return new SubscriptionInfo(version, clientUUID, prevTasks, standbyTasks);
+ return new SubscriptionInfo(version, processId, prevTasks, standbyTasks);
} else {
TaskAssignmentException ex = new TaskAssignmentException("unable to decode subscription data: version=" + version);
@@ -109,7 +111,7 @@ public class SubscriptionInfo {
@Override
public int hashCode() {
- return version ^ clientUUID.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
+ return version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
}
@Override
@@ -117,7 +119,7 @@ public class SubscriptionInfo {
if (o instanceof SubscriptionInfo) {
SubscriptionInfo other = (SubscriptionInfo) o;
return this.version == other.version &&
- this.clientUUID.equals(other.clientUUID) &&
+ this.processId.equals(other.processId) &&
this.prevTasks.equals(other.prevTasks) &&
this.standbyTasks.equals(other.standbyTasks);
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index 8aed6b8..d75e7e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -88,7 +88,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
final Deserializer<V> valDeserializer = serialization.valueDeserializer();
- context.register(this, new StateRestoreCallback() {
+ context.register(this, loggingEnabled, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
inner.put(keyDeserializer.deserialize(name, key),
http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
index 40ca9f5..029d72f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
@@ -56,7 +56,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private Serdes<K, V> serdes;
private ProcessorContext context;
- private String dbName;
private String dirName;
private RocksDB db;