You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/22 01:55:34 UTC

kafka git commit: KAFKA-6214: enable use of in-memory store for standby tasks

Repository: kafka
Updated Branches:
  refs/heads/trunk cfa6a78c7 -> 225b0b9c7


KAFKA-6214: enable use of in-memory store for standby tasks

Remove the flag in `ProcessorStateManager` that checks if a store is persistent when registering it as a standby task.
Updated the smoke test to use an in-memory store.

Author: Damian Guy <da...@gmail.com>

Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <ma...@confluent.io>

Closes #4239 from dguy/kafka-6214


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/225b0b9c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/225b0b9c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/225b0b9c

Branch: refs/heads/trunk
Commit: 225b0b9c712deb3c29f8bca300ba9f73d1084e81
Parents: cfa6a78
Author: Damian Guy <da...@gmail.com>
Authored: Tue Nov 21 17:55:30 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 21 17:55:30 2017 -0800

----------------------------------------------------------------------
 .../internals/ProcessorStateManager.java          |  7 ++-----
 .../processor/internals/StandbyTaskTest.java      | 18 ++++--------------
 .../kafka/streams/tests/SmokeTestClient.java      |  9 ++++++---
 3 files changed, 12 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/225b0b9c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index cc14c67..3a2803e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -140,11 +140,8 @@ public class ProcessorStateManager implements StateManager {
         final TopicPartition storePartition = new TopicPartition(topic, getPartition(topic));
 
         if (isStandby) {
-            if (store.persistent()) {
-                log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", store.name(), topic);
-
-                restoreCallbacks.put(topic, stateRestoreCallback);
-            }
+            log.trace("Preparing standby replica of  state store {} with changelog topic {}", store.name(), topic);
+            restoreCallbacks.put(topic, stateRestoreCallback);
         } else {
             log.trace("Restoring state store {} from changelog topic {}", store.name(), topic);
             final StateRestorer restorer = new StateRestorer(storePartition,

http://git-wip-us.apache.org/repos/asf/kafka/blob/225b0b9c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 396965a..8538567 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -167,8 +167,7 @@ public class StandbyTaskTest {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
         task.initialize();
-        assertEquals(Utils.mkSet(partition2), new HashSet<>(task.checkpointedOffsets().keySet()));
-
+        assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet()));
     }
 
     @SuppressWarnings("unchecked")
@@ -190,7 +189,8 @@ public class StandbyTaskTest {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
         task.initialize();
-        restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
+        final Set<TopicPartition> partition = Collections.singleton(partition2);
+        restoreStateConsumer.assign(partition);
 
         for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
                 new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
@@ -199,16 +199,7 @@ public class StandbyTaskTest {
             restoreStateConsumer.bufferRecord(record);
         }
 
-        for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) {
-            TopicPartition partition = entry.getKey();
-            long offset = entry.getValue();
-            if (offset >= 0) {
-                restoreStateConsumer.seek(partition, offset);
-            } else {
-                restoreStateConsumer.seekToBeginning(singleton(partition));
-            }
-        }
-
+        restoreStateConsumer.seekToBeginning(partition);
         task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
 
         StandbyContextImpl context = (StandbyContextImpl) task.context();
@@ -228,7 +219,6 @@ public class StandbyTaskTest {
 
         assertEquals(1, offsets.size());
         assertEquals(new Long(30L + 1L), offsets.get(partition2));
-
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/225b0b9c/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index b4ed127..887f763 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -27,10 +28,12 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.Stores;
 
 import java.io.File;
 import java.util.Properties;
@@ -186,7 +189,6 @@ public class SmokeTestClient extends SmokeTestUtil {
                 new Unwindow<String, Long>()
         ).to(stringSerde, longSerde, "sum");
 
-
         Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
         KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
         sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"));
@@ -225,8 +227,9 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).aggregate(agg.init(),
                     agg.adder(),
                     agg.remover(),
-                    longSerde,
-                    "cntByCnt"
+                    Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
+                            .withKeySerde(Serdes.String())
+                            .withValueSerde(Serdes.Long())
         ).to(stringSerde, longSerde, "tagg");
 
         final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);