You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/22 01:55:34 UTC
kafka git commit: KAFKA-6214: enable use of in-memory store for
standby tasks
Repository: kafka
Updated Branches:
refs/heads/trunk cfa6a78c7 -> 225b0b9c7
KAFKA-6214: enable use of in-memory store for standby tasks
Remove the flag in `ProcessorStateManager` that checks if a store is persistent when registering it as a standby task.
Updated the smoke test to use an in-memory store.
Author: Damian Guy <da...@gmail.com>
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <ma...@confluent.io>
Closes #4239 from dguy/kafka-6214
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/225b0b9c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/225b0b9c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/225b0b9c
Branch: refs/heads/trunk
Commit: 225b0b9c712deb3c29f8bca300ba9f73d1084e81
Parents: cfa6a78
Author: Damian Guy <da...@gmail.com>
Authored: Tue Nov 21 17:55:30 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 21 17:55:30 2017 -0800
----------------------------------------------------------------------
.../internals/ProcessorStateManager.java | 7 ++-----
.../processor/internals/StandbyTaskTest.java | 18 ++++--------------
.../kafka/streams/tests/SmokeTestClient.java | 9 ++++++---
3 files changed, 12 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/225b0b9c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index cc14c67..3a2803e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -140,11 +140,8 @@ public class ProcessorStateManager implements StateManager {
final TopicPartition storePartition = new TopicPartition(topic, getPartition(topic));
if (isStandby) {
- if (store.persistent()) {
- log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", store.name(), topic);
-
- restoreCallbacks.put(topic, stateRestoreCallback);
- }
+ log.trace("Preparing standby replica of state store {} with changelog topic {}", store.name(), topic);
+ restoreCallbacks.put(topic, stateRestoreCallback);
} else {
log.trace("Restoring state store {} from changelog topic {}", store.name(), topic);
final StateRestorer restorer = new StateRestorer(storePartition,
http://git-wip-us.apache.org/repos/asf/kafka/blob/225b0b9c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 396965a..8538567 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -167,8 +167,7 @@ public class StandbyTaskTest {
StreamsConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
task.initialize();
- assertEquals(Utils.mkSet(partition2), new HashSet<>(task.checkpointedOffsets().keySet()));
-
+ assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet()));
}
@SuppressWarnings("unchecked")
@@ -190,7 +189,8 @@ public class StandbyTaskTest {
StreamsConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
task.initialize();
- restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
+ final Set<TopicPartition> partition = Collections.singleton(partition2);
+ restoreStateConsumer.assign(partition);
for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
@@ -199,16 +199,7 @@ public class StandbyTaskTest {
restoreStateConsumer.bufferRecord(record);
}
- for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) {
- TopicPartition partition = entry.getKey();
- long offset = entry.getValue();
- if (offset >= 0) {
- restoreStateConsumer.seek(partition, offset);
- } else {
- restoreStateConsumer.seekToBeginning(singleton(partition));
- }
- }
-
+ restoreStateConsumer.seekToBeginning(partition);
task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
StandbyContextImpl context = (StandbyContextImpl) task.context();
@@ -228,7 +219,6 @@ public class StandbyTaskTest {
assertEquals(1, offsets.size());
assertEquals(new Long(30L + 1L), offsets.get(partition2));
-
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/225b0b9c/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index b4ed127..887f763 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.tests;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@@ -27,10 +28,12 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.Stores;
import java.io.File;
import java.util.Properties;
@@ -186,7 +189,6 @@ public class SmokeTestClient extends SmokeTestUtil {
new Unwindow<String, Long>()
).to(stringSerde, longSerde, "sum");
-
Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"));
@@ -225,8 +227,9 @@ public class SmokeTestClient extends SmokeTestUtil {
).aggregate(agg.init(),
agg.adder(),
agg.remover(),
- longSerde,
- "cntByCnt"
+ Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Long())
).to(stringSerde, longSerde, "tagg");
final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);