You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/16 22:28:24 UTC
kafka git commit: KAFKA-2811: add standby tasks
Repository: kafka
Updated Branches:
refs/heads/trunk 356544cab -> 45e7f7130
KAFKA-2811: add standby tasks
guozhangwang
* added a new config param "num.standby.replicas" (the default value is 0).
* added a new abstract class AbstractTask
* added StandbyTask as a subclass of AbstractTask
* modified StreamTask to a subclass of AbstractTask
* StreamThread
* standby tasks are created by calling StreamThread.addStandbyTask() from onPartitionsAssigned()
* standby tasks are destroyed by calling StreamThread.removeStandbyTasks() from onPartitionRevoked()
* In addStandbyTasks(), change log partitions are assigned to restoreConsumer.
* In removeStandByTasks(), change log partitions are removed from restoreConsumer.
* StreamThread polls change log records using restoreConsumer in the runLoop with timeout=0.
* If records are returned, StreamThread calls StandbyTask.update and pass records to each standby tasks.
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #526 from ymatsuda/standby_task
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45e7f713
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45e7f713
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45e7f713
Branch: refs/heads/trunk
Commit: 45e7f71309f9a3e30d25a6ddd3171c67e3e79286
Parents: 356544c
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Nov 16 13:34:42 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 16 13:34:42 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/StreamingConfig.java | 10 +
.../processor/internals/AbstractTask.java | 93 +++++++++
.../KafkaStreamingPartitionAssignor.java | 6 +-
.../internals/ProcessorStateManager.java | 103 +++++++---
.../processor/internals/StandbyContextImpl.java | 164 ++++++++++++++++
.../processor/internals/StandbyTask.java | 88 +++++++++
.../streams/processor/internals/StreamTask.java | 56 +-----
.../processor/internals/StreamThread.java | 143 ++++++++++----
.../KafkaStreamingPartitionAssignorTest.java | 174 +++++++++++++----
.../internals/ProcessorStateManagerTest.java | 93 +++++++--
.../processor/internals/StandbyTaskTest.java | 190 +++++++++++++++++++
.../processor/internals/StreamThreadTest.java | 12 +-
12 files changed, 951 insertions(+), 181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index 693cb0c..f563070 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -54,6 +54,10 @@ public class StreamingConfig extends AbstractConfig {
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
+ /** <code>num.stream.threads</code> */
+ public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
+ private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
+
/** <code>buffered.records.per.partition</code> */
public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
@@ -136,6 +140,11 @@ public class StreamingConfig extends AbstractConfig {
1,
Importance.LOW,
NUM_STREAM_THREADS_DOC)
+ .define(NUM_STANDBY_REPLICAS_CONFIG,
+ Type.INT,
+ 0,
+ Importance.LOW,
+ NUM_STANDBY_REPLICAS_DOC)
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
1000,
@@ -214,6 +223,7 @@ public class StreamingConfig extends AbstractConfig {
public Map<String, Object> getConsumerConfigs(StreamThread streamThread) {
Map<String, Object> props = getConsumerConfigs();
+ props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
return props;
http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
new file mode 100644
index 0000000..64bb10d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+public abstract class AbstractTask {
+ protected final TaskId id;
+ protected final ProcessorTopology topology;
+ protected final ProcessorStateManager stateMgr;
+ protected final Set<TopicPartition> partitions;
+ protected ProcessorContext processorContext;
+
+ protected AbstractTask(TaskId id,
+ Consumer<byte[], byte[]> restoreConsumer,
+ ProcessorTopology topology,
+ StreamingConfig config,
+ Set<TopicPartition> partitions) {
+ this.id = id;
+ this.topology = topology;
+ this.partitions = partitions;
+
+ // create the processor state manager
+ try {
+ File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
+ // if partitions is null, this is a standby task
+ this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, partitions == null);
+ } catch (IOException e) {
+ throw new KafkaException("Error while creating the state manager", e);
+ }
+ }
+
+ protected void initializeStateStores() {
+ for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
+ StateStore store = stateStoreSupplier.get();
+ store.init(this.processorContext);
+ }
+ }
+
+ public final TaskId id() {
+ return id;
+ }
+
+ public final Set<TopicPartition> partitions() {
+ return this.partitions;
+ }
+
+ public final ProcessorTopology topology() {
+ return topology;
+ }
+
+ public final ProcessorContext context() {
+ return processorContext;
+ }
+
+ public abstract void commit();
+
+ public void close() {
+ try {
+ stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+ } catch (IOException e) {
+ throw new KafkaException("Error while closing the state manager in processor context", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index 35ba0ec..451b214 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -46,11 +46,14 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
private StreamThread streamThread;
+ private int numStandbyReplicas;
private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
private Set<TaskId> standbyTasks;
@Override
public void configure(Map<String, ?> configs) {
+ numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
+
Object o = configs.get(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE);
if (o == null) {
KafkaException ex = new KafkaException("StreamThread is not specified");
@@ -99,7 +102,6 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
// - We try not to assign the same set of tasks to two different clients
// We do the assignment in one-pass. The result may not satisfy above all.
// 2. within each client, tasks are assigned to consumer clients in round-robin manner.
-
Map<UUID, Set<String>> consumersByClient = new HashMap<>();
Map<UUID, ClientState<TaskId>> states = new HashMap<>();
@@ -132,7 +134,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
// Get partition groups from the partition grouper
Map<TaskId, Set<TopicPartition>> partitionGroups = streamThread.partitionGrouper.partitionGroups(metadata);
- states = TaskAssignor.assign(states, partitionGroups.keySet(), 0); // TODO: enable standby tasks
+ states = TaskAssignor.assign(states, partitionGroups.keySet(), numStandbyReplicas);
Map<String, Assignment> assignment = new HashMap<>();
for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3cb9cea..2a8df9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -35,6 +35,7 @@ import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class ProcessorStateManager {
@@ -51,13 +52,17 @@ public class ProcessorStateManager {
private final Consumer<byte[], byte[]> restoreConsumer;
private final Map<TopicPartition, Long> restoredOffsets;
private final Map<TopicPartition, Long> checkpointedOffsets;
+ private final boolean isStandby;
+ private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks
- public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer) throws IOException {
+ public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
this.partition = partition;
this.baseDir = baseDir;
this.stores = new HashMap<>();
this.restoreConsumer = restoreConsumer;
this.restoredOffsets = new HashMap<>();
+ this.isStandby = isStandby;
+ this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
// create the state directory for this task if missing (we won't create the parent directory)
createStateDirectory(baseDir);
@@ -103,8 +108,6 @@ public class ProcessorStateManager {
if (this.stores.containsKey(store.name()))
throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");
- // ---- register the store ---- //
-
// check that the underlying change log topic exist or not
if (restoreConsumer.listTopics().containsKey(store.name())) {
boolean partitionNotFound = true;
@@ -124,48 +127,91 @@ public class ProcessorStateManager {
this.stores.put(store.name(), store);
+ if (isStandby) {
+ if (store.persistent())
+ restoreCallbacks.put(store.name(), stateRestoreCallback);
+ } else {
+ restoreActiveState(store, stateRestoreCallback);
+ }
+ }
+
+ private void restoreActiveState(StateStore store, StateRestoreCallback stateRestoreCallback) {
+
+ if (store == null)
+ throw new IllegalArgumentException("Store " + store.name() + " has not been registered.");
+
// ---- try to restore the state from change-log ---- //
// subscribe to the store's partition
- TopicPartition storePartition = new TopicPartition(store.name(), partition);
if (!restoreConsumer.subscription().isEmpty()) {
throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
}
+ TopicPartition storePartition = new TopicPartition(store.name(), partition);
restoreConsumer.assign(Collections.singletonList(storePartition));
- // calculate the end offset of the partition
- // TODO: this is a bit hacky to first seek then position to get the end offset
- restoreConsumer.seekToEnd(storePartition);
- long endOffset = restoreConsumer.position(storePartition);
+ try {
+ // calculate the end offset of the partition
+ // TODO: this is a bit hacky to first seek then position to get the end offset
+ restoreConsumer.seekToEnd(storePartition);
+ long endOffset = restoreConsumer.position(storePartition);
+
+ // restore from the checkpointed offset of the change log if it is persistent and the offset exists;
+ // restore the state from the beginning of the change log otherwise
+ if (checkpointedOffsets.containsKey(storePartition)) {
+ restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
+ } else {
+ restoreConsumer.seekToBeginning(storePartition);
+ }
- // restore from the checkpointed offset of the change log if it is persistent and the offset exists;
- // restore the state from the beginning of the change log otherwise
- if (checkpointedOffsets.containsKey(storePartition) && store.persistent()) {
- restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
- } else {
- restoreConsumer.seekToBeginning(storePartition);
- }
+ // restore its state from changelog records; while restoring the log end offset
+ // should not change since it is only written by this thread.
+ while (true) {
+ for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
+ stateRestoreCallback.restore(record.key(), record.value());
+ }
- // restore its state from changelog records; while restoring the log end offset
- // should not change since it is only written by this thread.
- while (true) {
- for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
- stateRestoreCallback.restore(record.key(), record.value());
+ if (restoreConsumer.position(storePartition) == endOffset) {
+ break;
+ } else if (restoreConsumer.position(storePartition) > endOffset) {
+ throw new IllegalStateException("Log end offset should not change while restoring");
+ }
}
- if (restoreConsumer.position(storePartition) == endOffset) {
- break;
- } else if (restoreConsumer.position(storePartition) > endOffset) {
- throw new IllegalStateException("Log end offset should not change while restoring");
+ // record the restored offset for its change log partition
+ long newOffset = restoreConsumer.position(storePartition);
+ restoredOffsets.put(storePartition, newOffset);
+ } finally {
+ // un-assign the change log partition
+ restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+ }
+ }
+
+ public Map<TopicPartition, Long> checkpointedOffsets() {
+ Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
+
+ for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
+ String storeName = entry.getKey();
+ TopicPartition storePartition = new TopicPartition(storeName, partition);
+
+ if (checkpointedOffsets.containsKey(storePartition)) {
+ partitionsAndOffsets.put(storePartition, checkpointedOffsets.get(storePartition));
+ } else {
+ partitionsAndOffsets.put(storePartition, -1L);
}
}
+ return partitionsAndOffsets;
+ }
+
+ public void updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
+ // restore states from changelog records
+ StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ restoreCallback.restore(record.key(), record.value());
+ }
// record the restored offset for its change log partition
long newOffset = restoreConsumer.position(storePartition);
restoredOffsets.put(storePartition, newOffset);
-
- // un-assign the change log partition
- restoreConsumer.assign(Collections.<TopicPartition>emptyList());
}
public StateStore getStore(String name) {
@@ -224,6 +270,9 @@ public class ProcessorStateManager {
checkpoint.write(checkpointOffsets);
}
+ // un-assign the change log partition
+ restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+
// release the state directory directoryLock
directoryLock.release();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
new file mode 100644
index 0000000..ea95300
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class StandbyContextImpl implements ProcessorContext, RecordCollector.Supplier {
+
+ private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class);
+
+ private final TaskId id;
+ private final StreamingMetrics metrics;
+ private final ProcessorStateManager stateMgr;
+
+ private final Serializer<?> keySerializer;
+ private final Serializer<?> valSerializer;
+ private final Deserializer<?> keyDeserializer;
+ private final Deserializer<?> valDeserializer;
+
+ private boolean initialized;
+
+ public StandbyContextImpl(TaskId id,
+ StreamingConfig config,
+ ProcessorStateManager stateMgr,
+ StreamingMetrics metrics) {
+ this.id = id;
+ this.metrics = metrics;
+ this.stateMgr = stateMgr;
+
+ this.keySerializer = config.keySerializer();
+ this.valSerializer = config.valueSerializer();
+ this.keyDeserializer = config.keyDeserializer();
+ this.valDeserializer = config.valueDeserializer();
+
+ this.initialized = false;
+ }
+
+ public void initialized() {
+ this.initialized = true;
+ }
+
+ public TaskId id() {
+ return id;
+ }
+
+ public ProcessorStateManager getStateMgr() {
+ return stateMgr;
+ }
+
+ @Override
+ public RecordCollector recordCollector() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Serializer<?> keySerializer() {
+ return this.keySerializer;
+ }
+
+ @Override
+ public Serializer<?> valueSerializer() {
+ return this.valSerializer;
+ }
+
+ @Override
+ public Deserializer<?> keyDeserializer() {
+ return this.keyDeserializer;
+ }
+
+ @Override
+ public Deserializer<?> valueDeserializer() {
+ return this.valDeserializer;
+ }
+
+ @Override
+ public File stateDir() {
+ return stateMgr.baseDir();
+ }
+
+ @Override
+ public StreamingMetrics metrics() {
+ return metrics;
+ }
+
+ @Override
+ public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
+ if (initialized)
+ throw new KafkaException("Can only create state stores during initialization.");
+
+ stateMgr.register(store, stateRestoreCallback);
+ }
+
+ @Override
+ public StateStore getStateStore(String name) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String topic() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int partition() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long offset() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long timestamp() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <K, V> void forward(K key, V value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <K, V> void forward(K key, V value, int childIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void commit() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void schedule(long interval) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
new file mode 100644
index 0000000..c6442d9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A StandbyTask
+ */
+public class StandbyTask extends AbstractTask {
+
+ private static final Logger log = LoggerFactory.getLogger(StandbyTask.class);
+
+ private final Map<TopicPartition, Long> checkpointedOffsets;
+
+ /**
+ * Create {@link StandbyTask} with its assigned partitions
+ *
+ * @param id the ID of this task
+ * @param restoreConsumer the instance of {@link Consumer} used when restoring state
+ * @param topology the instance of {@link ProcessorTopology}
+ * @param config the {@link StreamingConfig} specified by the user
+ * @param metrics the {@link StreamingMetrics} created by the thread
+ */
+ public StandbyTask(TaskId id,
+ Consumer<byte[], byte[]> restoreConsumer,
+ ProcessorTopology topology,
+ StreamingConfig config,
+ StreamingMetrics metrics) {
+ super(id, restoreConsumer, topology, config, null);
+
+ // initialize the topology with its own context
+ this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
+
+ initializeStateStores();
+
+ ((StandbyContextImpl) this.processorContext).initialized();
+
+ this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
+ }
+
+ public Map<TopicPartition, Long> checkpointedOffsets() {
+ return checkpointedOffsets;
+ }
+
+ public Collection<TopicPartition> changeLogPartitions() {
+ return checkpointedOffsets.keySet();
+ }
+
+ /**
+ * Updates a state store using records from one change log partition
+ */
+ public void update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
+ stateMgr.updateStandbyStates(partition, records);
+ }
+
+ public void commit() {
+ stateMgr.flush();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index a9c14e5..5d170f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -21,46 +21,37 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
* A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing.
*/
-public class StreamTask implements Punctuator {
+public class StreamTask extends AbstractTask implements Punctuator {
private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
- private final TaskId id;
private final int maxBufferedSize;
private final Consumer consumer;
private final PartitionGroup partitionGroup;
private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
private final PunctuationQueue punctuationQueue;
- private final ProcessorContextImpl processorContext;
- private final ProcessorTopology topology;
private final Map<TopicPartition, Long> consumedOffsets;
private final RecordCollector recordCollector;
- private final ProcessorStateManager stateMgr;
private boolean commitRequested = false;
private boolean commitOffsetNeeded = false;
@@ -89,12 +80,10 @@ public class StreamTask implements Punctuator {
ProcessorTopology topology,
StreamingConfig config,
StreamingMetrics metrics) {
-
- this.id = id;
+ super(id, restoreConsumer, topology, config, Collections.unmodifiableSet(new HashSet<>(partitions)));
this.consumer = consumer;
this.punctuationQueue = new PunctuationQueue();
this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
- this.topology = topology;
// create queues for each assigned partition and associate them
// to corresponding source nodes in the processor topology
@@ -117,22 +106,11 @@ public class StreamTask implements Punctuator {
log.info("Creating restoration consumer client for stream task #" + id());
- // create the processor state manager
- try {
- File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
- this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer);
- } catch (IOException e) {
- throw new KafkaException("Error while creating the state manager", e);
- }
-
// initialize the topology with its own context
this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics);
// initialize the state stores
- for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
- StateStore store = stateStoreSupplier.get();
- store.init(this.processorContext);
- }
+ initializeStateStores();
// initialize the task by initializing all its processor nodes in the topology
for (ProcessorNode node : this.topology.processors()) {
@@ -144,15 +122,7 @@ public class StreamTask implements Punctuator {
}
}
- this.processorContext.initialized();
- }
-
- public TaskId id() {
- return id;
- }
-
- public Set<TopicPartition> partitions() {
- return this.partitionGroup.partitions();
+ ((ProcessorContextImpl) this.processorContext).initialized();
}
/**
@@ -261,10 +231,6 @@ public class StreamTask implements Punctuator {
return this.currNode;
}
- public ProcessorTopology topology() {
- return this.topology;
- }
-
/**
* Commit the current task state
*/
@@ -335,11 +301,7 @@ public class StreamTask implements Punctuator {
if (exception != null)
throw exception;
- try {
- stateMgr.close(recordCollector.offsets());
- } catch (IOException e) {
- throw new KafkaException("Error while closing the state manager in processor context", e);
- }
+ super.close();
}
private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) {
@@ -371,8 +333,4 @@ public class StreamTask implements Punctuator {
}
}
- public ProcessorContext context() {
- return processorContext;
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 06e5951..bbaeb14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -78,7 +78,8 @@ public class StreamThread extends Thread {
protected final Consumer<byte[], byte[]> restoreConsumer;
private final AtomicBoolean running;
- private final Map<TaskId, StreamTask> tasks;
+ private final Map<TaskId, StreamTask> activeTasks;
+ private final Map<TaskId, StandbyTask> standbyTasks;
private final Set<TaskId> prevTasks;
private final String clientId;
private final Time time;
@@ -96,14 +97,16 @@ public class StreamThread extends Thread {
final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
- addPartitions(assignment);
+ addStreamTasks(assignment);
+ addStandbyTasks();
lastClean = time.milliseconds(); // start the cleaning cycle
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
commitAll();
- removePartitions();
+ removeStreamTasks();
+ removeStandbyTasks();
lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
}
};
@@ -141,7 +144,8 @@ public class StreamThread extends Thread {
this.restoreConsumer = (restoreConsumer != null) ? restoreConsumer : createRestoreConsumer();
// initialize the task list
- this.tasks = new HashMap<>();
+ this.activeTasks = new HashMap<>();
+ this.standbyTasks = new HashMap<>();
this.prevTasks = new HashSet<>();
// read in task specific config values
@@ -208,7 +212,7 @@ public class StreamThread extends Thread {
}
public Map<TaskId, StreamTask> tasks() {
- return Collections.unmodifiableMap(tasks);
+ return Collections.unmodifiableMap(activeTasks);
}
private void shutdown() {
@@ -236,7 +240,8 @@ public class StreamThread extends Thread {
log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e);
}
try {
- removePartitions();
+ removeStreamTasks();
+ removeStandbyTasks();
} catch (Throwable e) {
// already logged in removePartition()
}
@@ -261,7 +266,7 @@ public class StreamThread extends Thread {
ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
if (!records.isEmpty()) {
- for (StreamTask task : tasks.values()) {
+ for (StreamTask task : activeTasks.values()) {
for (TopicPartition partition : task.partitions()) {
task.addRecords(partition, records.records(partition));
}
@@ -274,11 +279,11 @@ public class StreamThread extends Thread {
totalNumBuffered = 0;
- if (!tasks.isEmpty()) {
+ if (!activeTasks.isEmpty()) {
// try to process one record from each task
requiresPoll = false;
- for (StreamTask task : tasks.values()) {
+ for (StreamTask task : activeTasks.values()) {
long startProcess = time.milliseconds();
totalNumBuffered += task.process();
@@ -294,6 +299,10 @@ public class StreamThread extends Thread {
requiresPoll = true;
}
+ if (!standbyTasks.isEmpty()) {
+ updateStandbyTasks();
+ }
+
maybeClean();
}
} catch (Exception e) {
@@ -301,6 +310,18 @@ public class StreamThread extends Thread {
}
}
+ private void updateStandbyTasks() {
+ ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+
+ if (!records.isEmpty()) {
+ for (StandbyTask task : standbyTasks.values()) {
+ for (TopicPartition partition : task.changeLogPartitions()) {
+ task.update(partition, records.records(partition));
+ }
+ }
+ }
+ }
+
private boolean stillRunning() {
if (!running.get()) {
log.debug("Shutting down at user request.");
@@ -316,7 +337,7 @@ public class StreamThread extends Thread {
}
private void maybePunctuate() {
- for (StreamTask task : tasks.values()) {
+ for (StreamTask task : activeTasks.values()) {
try {
long now = time.milliseconds();
@@ -324,7 +345,7 @@ public class StreamThread extends Thread {
sensors.punctuateTimeSensor.record(time.milliseconds() - now);
} catch (Exception e) {
- log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+ log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
}
@@ -339,12 +360,12 @@ public class StreamThread extends Thread {
commitAll();
lastCommit = now;
} else {
- for (StreamTask task : tasks.values()) {
+ for (StreamTask task : activeTasks.values()) {
try {
if (task.commitNeeded())
commitOne(task, time.milliseconds());
} catch (Exception e) {
- log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+ log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
}
@@ -355,24 +376,22 @@ public class StreamThread extends Thread {
* Commit the states of all its tasks
*/
private void commitAll() {
- for (StreamTask task : tasks.values()) {
- try {
- commitOne(task, time.milliseconds());
- } catch (Exception e) {
- log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
- throw e;
- }
+ for (StreamTask task : activeTasks.values()) {
+ commitOne(task, time.milliseconds());
+ }
+ for (StandbyTask task : standbyTasks.values()) {
+ commitOne(task, time.milliseconds());
}
}
/**
* Commit the state of a task
*/
- private void commitOne(StreamTask task, long now) {
+ private void commitOne(AbstractTask task, long now) {
try {
task.commit();
} catch (Exception e) {
- log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+ log.error("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
@@ -426,7 +445,7 @@ public class StreamThread extends Thread {
* Returns ids of tasks that were being executed before the rebalance.
*/
public Set<TaskId> prevTasks() {
- return prevTasks;
+ return Collections.unmodifiableSet(prevTasks);
}
/**
@@ -467,8 +486,7 @@ public class StreamThread extends Thread {
return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, topology, config, sensors);
}
- private void addPartitions(Collection<TopicPartition> assignment) {
-
+ private void addStreamTasks(Collection<TopicPartition> assignment) {
HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
for (TopicPartition partition : assignment) {
@@ -486,9 +504,9 @@ public class StreamThread extends Thread {
// create the tasks
for (TaskId taskId : partitionsForTask.keySet()) {
try {
- tasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
+ activeTasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
} catch (Exception e) {
- log.error("Failed to create a task #" + taskId + " in thread [" + this.getName() + "]: ", e);
+ log.error("Failed to create an active task #" + taskId + " in thread [" + this.getName() + "]: ", e);
throw e;
}
}
@@ -496,23 +514,68 @@ public class StreamThread extends Thread {
lastClean = time.milliseconds();
}
- private void removePartitions() {
-
+ private void removeStreamTasks() {
// TODO: change this clearing tasks behavior
- for (StreamTask task : tasks.values()) {
- log.info("Removing task {}", task.id());
- try {
- task.close();
- } catch (Exception e) {
- log.error("Failed to close a task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
- throw e;
- }
- sensors.taskDestructionSensor.record();
+ for (StreamTask task : activeTasks.values()) {
+ closeOne(task);
}
+
prevTasks.clear();
- prevTasks.addAll(tasks.keySet());
+ prevTasks.addAll(activeTasks.keySet());
+
+ activeTasks.clear();
+ }
+
+ private void closeOne(AbstractTask task) {
+ log.info("Removing a task {}", task.id());
+ try {
+ task.close();
+ } catch (Exception e) {
+ log.error("Failed to close a " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+ throw e;
+ }
+ sensors.taskDestructionSensor.record();
+ }
+
+ protected StandbyTask createStandbyTask(TaskId id) {
+ sensors.taskCreationSensor.record();
+
+ ProcessorTopology topology = builder.build(id.topicGroupId);
+
+ return new StandbyTask(id, restoreConsumer, topology, config, sensors);
+ }
+
+ private void addStandbyTasks() {
+ Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
+
+ for (TaskId taskId : partitionGrouper.standbyTasks()) {
+ StandbyTask task = createStandbyTask(taskId);
+ standbyTasks.put(taskId, task);
+ checkpointedOffsets.putAll(task.checkpointedOffsets());
+ }
+
+ restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet()));
+
+ for (Map.Entry<TopicPartition, Long> entry : checkpointedOffsets.entrySet()) {
+ TopicPartition partition = entry.getKey();
+ long offset = entry.getValue();
+ if (offset >= 0) {
+ restoreConsumer.seek(partition, offset);
+ } else {
+ restoreConsumer.seekToBeginning(partition);
+ }
+ }
+ }
+
+
+ private void removeStandbyTasks() {
+ for (StandbyTask task : standbyTasks.values()) {
+ closeOne(task);
+ }
+ // un-assign the change log partitions
+ restoreConsumer.assign(Collections.<TopicPartition>emptyList());
- tasks.clear();
+ standbyTasks.clear();
}
private void ensureCopartitioning(Collection<Set<String>> copartitionGroups) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
index c0df2e7..aa484fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
@@ -17,12 +17,10 @@
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -42,7 +40,6 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -110,26 +107,6 @@ public class KafkaStreamingPartitionAssignorTest {
};
}
- private static class TestStreamTask extends StreamTask {
- public boolean committed = false;
-
- public TestStreamTask(TaskId id,
- Consumer<byte[], byte[]> consumer,
- Producer<byte[], byte[]> producer,
- Consumer<byte[], byte[]> restoreConsumer,
- Collection<TopicPartition> partitions,
- ProcessorTopology topology,
- StreamingConfig config) {
- super(id, consumer, producer, restoreConsumer, partitions, topology, config, null);
- }
-
- @Override
- public void commit() {
- super.commit();
- committed = true;
- }
- }
-
private ByteArraySerializer serializer = new ByteArraySerializer();
@SuppressWarnings("unchecked")
@@ -165,9 +142,7 @@ public class KafkaStreamingPartitionAssignorTest {
};
KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
- partitionAssignor.configure(
- Collections.singletonMap(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread)
- );
+ partitionAssignor.configure(config.getConsumerConfigs(thread));
PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
@@ -193,6 +168,103 @@ public class KafkaStreamingPartitionAssignorTest {
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+ List<String> topics = Utils.mkList("topic1", "topic2");
+ Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+ final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+ final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
+ final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+ final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
+ final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
+ final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
+
+ UUID uuid1 = UUID.randomUUID();
+ UUID uuid2 = UUID.randomUUID();
+
+ StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid1, new Metrics(), new SystemTime());
+
+ KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(thread10));
+
+ Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ subscriptions.put("consumer10",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+ subscriptions.put("consumer11",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+ subscriptions.put("consumer20",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+
+ Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+ // check assigned partitions
+
+ assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
+ Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
+ assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
+
+ // check assignment info
+ Set<TaskId> allActiveTasks = new HashSet<>();
+ AssignmentInfo info;
+
+ List<TaskId> activeTasks = new ArrayList<>();
+ for (TopicPartition partition : assignments.get("consumer10").partitions()) {
+ activeTasks.add(new TaskId(0, partition.partition()));
+ }
+ info = AssignmentInfo.decode(assignments.get("consumer10").userData());
+ assertEquals(activeTasks, info.activeTasks);
+ assertEquals(2, info.activeTasks.size());
+ assertEquals(1, new HashSet<>(info.activeTasks).size());
+ assertEquals(0, info.standbyTasks.size());
+
+ allActiveTasks.addAll(info.activeTasks);
+
+ activeTasks.clear();
+ for (TopicPartition partition : assignments.get("consumer11").partitions()) {
+ activeTasks.add(new TaskId(0, partition.partition()));
+ }
+ info = AssignmentInfo.decode(assignments.get("consumer11").userData());
+ assertEquals(activeTasks, info.activeTasks);
+ assertEquals(2, info.activeTasks.size());
+ assertEquals(1, new HashSet<>(info.activeTasks).size());
+ assertEquals(0, info.standbyTasks.size());
+
+ allActiveTasks.addAll(info.activeTasks);
+
+ // check active tasks assigned to the first client
+ assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
+
+ activeTasks.clear();
+ for (TopicPartition partition : assignments.get("consumer20").partitions()) {
+ activeTasks.add(new TaskId(0, partition.partition()));
+ }
+ info = AssignmentInfo.decode(assignments.get("consumer20").userData());
+ assertEquals(activeTasks, info.activeTasks);
+ assertEquals(2, info.activeTasks.size());
+ assertEquals(1, new HashSet<>(info.activeTasks).size());
+ assertEquals(0, info.standbyTasks.size());
+
+ allActiveTasks.addAll(info.activeTasks);
+
+ assertEquals(3, allActiveTasks.size());
+ assertEquals(allTasks, new HashSet<>(allActiveTasks));
+ }
+
+ @Test
+ public void testAssignWithStandbyReplicas() throws Exception {
+ Properties props = configProps();
+ props.setProperty(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
+ StreamingConfig config = new StreamingConfig(props);
+
+ MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+ MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source1", "topic1");
+ builder.addSource("source2", "topic2");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+ List<String> topics = Utils.mkList("topic1", "topic2");
+ Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
@@ -207,17 +279,15 @@ public class KafkaStreamingPartitionAssignorTest {
StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid1, new Metrics(), new SystemTime());
KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
- partitionAssignor.configure(
- Collections.singletonMap(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread10)
- );
+ partitionAssignor.configure(config.getConsumerConfigs(thread10));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(Utils.mkList("topic1", "topic2"), new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
subscriptions.put("consumer11",
- new PartitionAssignor.Subscription(Utils.mkList("topic1", "topic2"), new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
subscriptions.put("consumer20",
- new PartitionAssignor.Subscription(Utils.mkList("topic1", "topic2"), new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
@@ -228,24 +298,54 @@ public class KafkaStreamingPartitionAssignorTest {
assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
// check assignment info
+ Set<TaskId> allActiveTasks = new HashSet<>();
+ Set<TaskId> allStandbyTasks = new HashSet<>();
+ AssignmentInfo info;
List<TaskId> activeTasks = new ArrayList<>();
for (TopicPartition partition : assignments.get("consumer10").partitions()) {
activeTasks.add(new TaskId(0, partition.partition()));
}
- assertEquals(activeTasks, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks);
+ info = AssignmentInfo.decode(assignments.get("consumer10").userData());
+ assertEquals(activeTasks, info.activeTasks);
+ assertEquals(2, info.activeTasks.size());
+ assertEquals(1, new HashSet<>(info.activeTasks).size());
+
+ allActiveTasks.addAll(info.activeTasks);
+ allStandbyTasks.addAll(info.standbyTasks);
activeTasks.clear();
for (TopicPartition partition : assignments.get("consumer11").partitions()) {
activeTasks.add(new TaskId(0, partition.partition()));
}
- assertEquals(activeTasks, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks);
+ info = AssignmentInfo.decode(assignments.get("consumer11").userData());
+ assertEquals(activeTasks, info.activeTasks);
+ assertEquals(2, info.activeTasks.size());
+ assertEquals(1, new HashSet<>(info.activeTasks).size());
+
+ allActiveTasks.addAll(info.activeTasks);
+ allStandbyTasks.addAll(info.standbyTasks);
+
+ // check tasks assigned to the first client
+ assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
activeTasks.clear();
for (TopicPartition partition : assignments.get("consumer20").partitions()) {
activeTasks.add(new TaskId(0, partition.partition()));
}
- assertEquals(activeTasks, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks);
+ info = AssignmentInfo.decode(assignments.get("consumer20").userData());
+ assertEquals(activeTasks, info.activeTasks);
+ assertEquals(2, info.activeTasks.size());
+ assertEquals(1, new HashSet<>(info.activeTasks).size());
+
+ allActiveTasks.addAll(info.activeTasks);
+ allStandbyTasks.addAll(info.standbyTasks);
+
+ assertEquals(3, allActiveTasks.size());
+ assertEquals(allTasks, new HashSet<>(allActiveTasks));
+
+ assertEquals(3, allStandbyTasks.size());
+ assertEquals(allTasks, new HashSet<>(allStandbyTasks));
}
@Test
@@ -266,9 +366,7 @@ public class KafkaStreamingPartitionAssignorTest {
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), new SystemTime());
KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
- partitionAssignor.configure(
- Collections.singletonMap(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread)
- );
+ partitionAssignor.configure(config.getConsumerConfigs(thread));
List<TaskId> activeTaskList = Utils.mkList(task0, task3);
Set<TaskId> standbyTasks = Utils.mkSet(task1, task2);
http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index c447f99..17bc9da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -36,7 +36,6 @@ import java.io.IOException;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -50,7 +49,7 @@ import static org.junit.Assert.assertFalse;
public class ProcessorStateManagerTest {
- private class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
+ public static class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
private final Serializer<Integer> serializer = new IntegerSerializer();
public TopicPartition assignedPartition = null;
@@ -79,7 +78,7 @@ public class ProcessorStateManagerTest {
recordBuffer.clear();
}
- // buffer a record (we cannot use addRecord because we need to add records before asigning a partition)
+ // buffer a record (we cannot use addRecord because we need to add records before assigning a partition)
public void bufferRecord(ConsumerRecord<Integer, Integer> record) {
recordBuffer.add(
new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
@@ -186,7 +185,7 @@ public class ProcessorStateManagerTest {
FileLock lock;
// the state manager locks the directory
- ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
+ ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer(), false);
try {
// this should not get the lock
@@ -215,7 +214,7 @@ public class ProcessorStateManagerTest {
try {
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
- ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
+ ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer(), false);
try {
stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
} finally {
@@ -235,21 +234,24 @@ public class ProcessorStateManagerTest {
checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- restoreConsumer.updatePartitions("persistentStore", Arrays.asList(
+ restoreConsumer.updatePartitions("persistentStore", Utils.mkList(
new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("persistentStore", 2, Node.noNode(), new Node[0], new Node[0])
));
- restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
+
+ TopicPartition partition = new TopicPartition("persistentStore", 2);
+ restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
- ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer, false);
try {
restoreConsumer.reset();
ArrayList<Integer> expectedKeys = new ArrayList<>();
+ long offset = -1L;
for (int i = 1; i <= 3; i++) {
- long offset = (long) i;
+ offset = (long) i;
int key = i * 10;
expectedKeys.add(key);
restoreConsumer.bufferRecord(
@@ -283,21 +285,24 @@ public class ProcessorStateManagerTest {
checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList(
+ restoreConsumer.updatePartitions("nonPersistentStore", Utils.mkList(
new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("nonPersistentStore", 2, Node.noNode(), new Node[0], new Node[0])
));
- restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
+
+ TopicPartition partition = new TopicPartition("persistentStore", 2);
+ restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); // non persistent store
- ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer, false);
try {
restoreConsumer.reset();
ArrayList<Integer> expectedKeys = new ArrayList<>();
+ long offset = -1L;
for (int i = 1; i <= 3; i++) {
- long offset = (long) (i + 100);
+ offset = (long) (i + 100);
int key = i;
expectedKeys.add(i);
restoreConsumer.bufferRecord(
@@ -312,6 +317,7 @@ public class ProcessorStateManagerTest {
assertTrue(restoreConsumer.seekToBeginingCalled);
assertTrue(restoreConsumer.seekToEndCalled);
assertEquals(expectedKeys, nonPersistentStore.keys);
+
} finally {
stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
}
@@ -321,17 +327,68 @@ public class ProcessorStateManagerTest {
}
@Test
+ public void testChangeLogOffsets() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ long lastCheckpointedOffset = 10L;
+ OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+ checkpoint.write(Collections.singletonMap(new TopicPartition("store1", 0), lastCheckpointedOffset));
+
+ MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+ restoreConsumer.updatePartitions("store1", Utils.mkList(
+ new PartitionInfo("store1", 0, Node.noNode(), new Node[0], new Node[0])
+ ));
+ restoreConsumer.updatePartitions("store2", Utils.mkList(
+ new PartitionInfo("store2", 0, Node.noNode(), new Node[0], new Node[0])
+ ));
+
+ TopicPartition partition1 = new TopicPartition("store1", 0);
+ TopicPartition partition2 = new TopicPartition("store2", 0);
+
+ Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ endOffsets.put(partition1, 13L);
+ endOffsets.put(partition2, 17L);
+ restoreConsumer.updateEndOffsets(endOffsets);
+
+ MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore("store1", true);
+ MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore("store2", true);
+
+ ProcessorStateManager stateMgr = new ProcessorStateManager(0, baseDir, restoreConsumer, true); // standby
+ try {
+ restoreConsumer.reset();
+
+ stateMgr.register(store1, store1.stateRestoreCallback);
+ stateMgr.register(store2, store2.stateRestoreCallback);
+
+ Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets();
+
+ assertEquals(2, changeLogOffsets.size());
+ assertTrue(changeLogOffsets.containsKey(partition1));
+ assertTrue(changeLogOffsets.containsKey(partition2));
+ assertEquals(lastCheckpointedOffset, (long) changeLogOffsets.get(partition1));
+ assertEquals(-1L, (long) changeLogOffsets.get(partition2));
+
+ } finally {
+ stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
public void testGetStore() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- restoreConsumer.updatePartitions("mockStore", Arrays.asList(
+ restoreConsumer.updatePartitions("mockStore", Utils.mkList(
new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0])
));
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
- ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer, false);
try {
stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
@@ -356,10 +413,10 @@ public class ProcessorStateManagerTest {
oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- restoreConsumer.updatePartitions("persistentStore", Arrays.asList(
+ restoreConsumer.updatePartitions("persistentStore", Utils.mkList(
new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0])
));
- restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList(
+ restoreConsumer.updatePartitions("nonPersistentStore", Utils.mkList(
new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0])
));
@@ -372,7 +429,7 @@ public class ProcessorStateManagerTest {
MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
- ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
+ ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer, false);
try {
// make sure the checkpoint file is deleted
assertFalse(checkpointFile.exists());
http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
new file mode 100644
index 0000000..b8a6990
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.apache.kafka.test.MockStateStoreSupplier;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class StandbyTaskTest {
+
+ private final TaskId taskId = new TaskId(0, 1);
+
+ private final Serializer<Integer> intSerializer = new IntegerSerializer();
+
+ private final TopicPartition partition1 = new TopicPartition("store1", 1);
+ private final TopicPartition partition2 = new TopicPartition("store2", 1);
+
+ private final ProcessorTopology topology = new ProcessorTopology(
+ Collections.<ProcessorNode>emptyList(),
+ Collections.<String, SourceNode>emptyMap(),
+ Utils.<StateStoreSupplier>mkList(
+ new MockStateStoreSupplier(partition1.topic(), false),
+ new MockStateStoreSupplier(partition2.topic(), true)
+ )
+ );
+
+
+ private StreamingConfig createConfig(final File baseDir) throws Exception {
+ return new StreamingConfig(new Properties() {
+ {
+ setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+ setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+ setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+ setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+ }
+ });
+ }
+
+ private final ProcessorStateManagerTest.MockRestoreConsumer restoreStateConsumer = new ProcessorStateManagerTest.MockRestoreConsumer();
+
+ private final byte[] recordValue = intSerializer.serialize(null, 10);
+ private final byte[] recordKey = intSerializer.serialize(null, 1);
+
+ @Before
+ public void setup() {
+ restoreStateConsumer.reset();
+ restoreStateConsumer.updatePartitions("store1", Utils.mkList(
+ new PartitionInfo("store1", 0, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("store1", 1, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("store1", 2, Node.noNode(), new Node[0], new Node[0])
+ ));
+
+ restoreStateConsumer.updatePartitions("store2", Utils.mkList(
+ new PartitionInfo("store2", 0, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("store2", 1, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("store2", 2, Node.noNode(), new Node[0], new Node[0])
+ ));
+ }
+
+ @Test
+ public void testStorePartitions() throws Exception {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ StreamingConfig config = createConfig(baseDir);
+ StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+
+ assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = Exception.class)
+ public void testUpdateNonPersistentStore() throws Exception {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ StreamingConfig config = createConfig(baseDir);
+ StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+
+ restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
+
+ task.update(partition1,
+ records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue))
+ );
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testUpdate() throws Exception {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ StreamingConfig config = createConfig(baseDir);
+ StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+
+ restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
+
+ for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 1, 100),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 2, 100),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 3, 100))) {
+ restoreStateConsumer.bufferRecord(record);
+ }
+
+ for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) {
+ TopicPartition partition = entry.getKey();
+ long offset = entry.getValue();
+ if (offset >= 0) {
+ restoreStateConsumer.seek(partition, offset);
+ } else {
+ restoreStateConsumer.seekToBeginning(partition);
+ }
+ }
+
+ task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
+
+ StandbyContextImpl context = (StandbyContextImpl) task.context();
+ MockStateStoreSupplier.MockStateStore store1 =
+ (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(partition1.topic());
+ MockStateStoreSupplier.MockStateStore store2 =
+ (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(partition2.topic());
+
+ assertEquals(Collections.emptyList(), store1.keys);
+ assertEquals(Utils.mkList(1, 2, 3), store2.keys);
+
+ task.close();
+
+ File taskDir = new File(baseDir, taskId.toString());
+ OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+ Map<TopicPartition, Long> offsets = checkpoint.read();
+
+ assertEquals(1, offsets.size());
+ assertEquals(new Long(30L + 1L), offsets.get(partition2));
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
+ return Arrays.asList(recs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 54d0a18..02d0ac7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -167,7 +167,7 @@ public class StreamThreadTest {
}
};
- initPartitionGrouper(thread);
+ initPartitionGrouper(config, thread);
ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
@@ -292,7 +292,7 @@ public class StreamThreadTest {
}
};
- initPartitionGrouper(thread);
+ initPartitionGrouper(config, thread);
ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
@@ -414,7 +414,7 @@ public class StreamThreadTest {
}
};
- initPartitionGrouper(thread);
+ initPartitionGrouper(config, thread);
ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
@@ -467,13 +467,11 @@ public class StreamThreadTest {
}
}
- private void initPartitionGrouper(StreamThread thread) {
+ private void initPartitionGrouper(StreamingConfig config, StreamThread thread) {
KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
- partitionAssignor.configure(
- Collections.singletonMap(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread)
- );
+ partitionAssignor.configure(config.getConsumerConfigs(thread));
Map<String, PartitionAssignor.Assignment> assignments =
partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));