You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/01 17:20:52 UTC

kafka git commit: KAFKA-4302: Simplify KTableSource

Repository: kafka
Updated Branches:
  refs/heads/0.10.1 3b0a8d6df -> baae90a1e


KAFKA-4302: Simplify KTableSource

KTableSource is always materialized since IQ:
  - removed flag KTableSource#materialized
  - removed MaterializedKTableSourceProcessor

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2080 from mjsax/kafka-4302-simplify-ktablesource-0.10.1


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/baae90a1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/baae90a1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/baae90a1

Branch: refs/heads/0.10.1
Commit: baae90a1e40c219d37c67167a9a65b21202f9ad6
Parents: 3b0a8d6
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue Nov 1 10:19:36 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 1 10:19:36 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/KStreamBuilder.java   | 13 ++++++++--
 .../streams/kstream/internals/KTableImpl.java   | 25 --------------------
 .../streams/kstream/internals/KTableSource.java | 22 +----------------
 3 files changed, 12 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/baae90a1/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index f9544cc..38d126e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -22,7 +22,9 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -139,7 +141,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param valSerde   value serde used to send key-value pairs,
      *                   if not specified the default value serde defined in the configuration will be used
      * @param topic      the topic name; cannot be null
-     * @param storeName  the state store name used if this KTable is materialized, can be null if materialization not expected
+     * @param storeName  the state store name used for the materialized KTable
      * @return a {@link KTable} for the specified topics
      */
     public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) {
@@ -151,7 +153,14 @@ public class KStreamBuilder extends TopologyBuilder {
         addProcessor(name, processorSupplier, source);
 
         final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerde, valSerde, storeName);
-        kTable.materialize((KTableSource) processorSupplier);
+        StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName,
+            keySerde,
+            valSerde,
+            false,
+            Collections.<String, String>emptyMap(),
+            true);
+
+        addStateStore(storeSupplier, name);
         connectSourceStoreAndTopic(storeName, topic);
 
         return kTable;

http://git-wip-us.apache.org/repos/asf/kafka/blob/baae90a1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c53e761..7ce0bbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
@@ -31,14 +30,11 @@ import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
-import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 
@@ -388,9 +384,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     KTableValueGetterSupplier<K, V> valueGetterSupplier() {
         if (processorSupplier instanceof KTableSource) {
             KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
-            if (!source.isMaterialized()) {
-                throw new StreamsException("Source is not materialized");
-            }
             return new KTableSourceValueGetterSupplier<>(source.storeName);
         } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
             return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
@@ -404,9 +397,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
-                if (!source.isMaterialized()) {
-                    throw new StreamsException("Source is not materialized");
-                }
                 source.enableSendingOldValues();
             } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
                 ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
@@ -421,19 +411,4 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return sendOldValues;
     }
 
-    public void materialize(KTableSource<K, ?> source) {
-        synchronized (source) {
-            if (!source.isMaterialized()) {
-                StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(source.storeName,
-                                                                                      keySerde,
-                                                                                      valSerde,
-                                                                                      false,
-                                                                                      Collections.<String, String>emptyMap(),
-                                                                                      true);
-                // mark this state as non internal hence it is read directly from a user topic
-                topology.addStateStore(storeSupplier, name);
-                source.materialize();
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/baae90a1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index d8d389f..20a80f4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -29,7 +29,6 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
 
     public final String storeName;
 
-    private boolean materialized = false;
     private boolean sendOldValues = false;
 
     public KTableSource(String storeName) {
@@ -38,15 +37,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
 
     @Override
     public Processor<K, V> get() {
-        return materialized ? new MaterializedKTableSourceProcessor() : new KTableSourceProcessor();
-    }
-
-    public void materialize() {
-        materialized = true;
-    }
-
-    public boolean isMaterialized() {
-        return materialized;
+        return new KTableSourceProcessor();
     }
 
     public void enableSendingOldValues() {
@@ -54,17 +45,6 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
     }
 
     private class KTableSourceProcessor extends AbstractProcessor<K, V> {
-        @Override
-        public void process(K key, V value) {
-            // the keys should never be null
-            if (key == null)
-                throw new StreamsException("Record key for the source KTable from store name " + storeName + " should not be null.");
-
-            context().forward(key, new Change<>(value, null));
-        }
-    }
-
-    private class MaterializedKTableSourceProcessor extends AbstractProcessor<K, V> {
 
         private KeyValueStore<K, V> store;