You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/01 17:20:52 UTC
kafka git commit: KAFKA-4302: Simplify KTableSource
Repository: kafka
Updated Branches:
refs/heads/0.10.1 3b0a8d6df -> baae90a1e
KAFKA-4302: Simplify KTableSource
KTableSource is always materialized since IQ:
- removed flag KTableSource#materialized
- removed MaterializedKTableSourceProcessor
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2080 from mjsax/kafka-4302-simplify-ktablesource-0.10.1
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/baae90a1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/baae90a1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/baae90a1
Branch: refs/heads/0.10.1
Commit: baae90a1e40c219d37c67167a9a65b21202f9ad6
Parents: 3b0a8d6
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue Nov 1 10:19:36 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 1 10:19:36 2016 -0700
----------------------------------------------------------------------
.../kafka/streams/kstream/KStreamBuilder.java | 13 ++++++++--
.../streams/kstream/internals/KTableImpl.java | 25 --------------------
.../streams/kstream/internals/KTableSource.java | 22 +----------------
3 files changed, 12 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/baae90a1/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index f9544cc..38d126e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -22,7 +22,9 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
@@ -139,7 +141,7 @@ public class KStreamBuilder extends TopologyBuilder {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be null
- * @param storeName the state store name used if this KTable is materialized, can be null if materialization not expected
+ * @param storeName the state store name used for the materialized KTable
* @return a {@link KTable} for the specified topics
*/
public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) {
@@ -151,7 +153,14 @@ public class KStreamBuilder extends TopologyBuilder {
addProcessor(name, processorSupplier, source);
final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerde, valSerde, storeName);
- kTable.materialize((KTableSource) processorSupplier);
+ StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName,
+ keySerde,
+ valSerde,
+ false,
+ Collections.<String, String>emptyMap(),
+ true);
+
+ addStateStore(storeSupplier, name);
connectSourceStoreAndTopic(storeName, topic);
return kTable;
http://git-wip-us.apache.org/repos/asf/kafka/blob/baae90a1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c53e761..7ce0bbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
@@ -31,14 +30,11 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
-import java.util.Collections;
import java.util.Objects;
import java.util.Set;
@@ -388,9 +384,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
KTableValueGetterSupplier<K, V> valueGetterSupplier() {
if (processorSupplier instanceof KTableSource) {
KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
- if (!source.isMaterialized()) {
- throw new StreamsException("Source is not materialized");
- }
return new KTableSourceValueGetterSupplier<>(source.storeName);
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
@@ -404,9 +397,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
if (!sendOldValues) {
if (processorSupplier instanceof KTableSource) {
KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
- if (!source.isMaterialized()) {
- throw new StreamsException("Source is not materialized");
- }
source.enableSendingOldValues();
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
@@ -421,19 +411,4 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return sendOldValues;
}
- public void materialize(KTableSource<K, ?> source) {
- synchronized (source) {
- if (!source.isMaterialized()) {
- StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(source.storeName,
- keySerde,
- valSerde,
- false,
- Collections.<String, String>emptyMap(),
- true);
- // mark this state as non internal hence it is read directly from a user topic
- topology.addStateStore(storeSupplier, name);
- source.materialize();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/baae90a1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index d8d389f..20a80f4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -29,7 +29,6 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
public final String storeName;
- private boolean materialized = false;
private boolean sendOldValues = false;
public KTableSource(String storeName) {
@@ -38,15 +37,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
@Override
public Processor<K, V> get() {
- return materialized ? new MaterializedKTableSourceProcessor() : new KTableSourceProcessor();
- }
-
- public void materialize() {
- materialized = true;
- }
-
- public boolean isMaterialized() {
- return materialized;
+ return new KTableSourceProcessor();
}
public void enableSendingOldValues() {
@@ -54,17 +45,6 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
}
private class KTableSourceProcessor extends AbstractProcessor<K, V> {
- @Override
- public void process(K key, V value) {
- // the keys should never be null
- if (key == null)
- throw new StreamsException("Record key for the source KTable from store name " + storeName + " should not be null.");
-
- context().forward(key, new Change<>(value, null));
- }
- }
-
- private class MaterializedKTableSourceProcessor extends AbstractProcessor<K, V> {
private KeyValueStore<K, V> store;