You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/04 23:59:30 UTC
[1/2] kafka git commit: KAFKA-2856: Add KTable non-stateful APIs
along with standby task support
Repository: kafka
Updated Branches:
refs/heads/trunk cd54fc881 -> 39c3512ec
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index b68f763..8aed6b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -21,16 +21,11 @@ import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
@@ -39,8 +34,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
protected final String metricGrp;
protected final Time time;
- private final String topic;
-
private Sensor putTime;
private Sensor getTime;
private Sensor deleteTime;
@@ -51,26 +44,20 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
private Sensor restoreTime;
private StreamingMetrics metrics;
- private final Set<K> dirty;
- private final Set<K> removed;
- private final int maxDirty;
- private final int maxRemoved;
-
- private int partition;
- private ProcessorContext context;
+ private boolean loggingEnabled = true;
+ private KeyValueStoreChangeLogger<K, V> changeLogger = null;
- // always wrap the logged store with the metered store
+ // always wrap the store with the metered store
public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricGrp, Time time) {
this.inner = inner;
this.serialization = serialization;
this.metricGrp = metricGrp;
this.time = time != null ? time : new SystemTime();
- this.topic = inner.name();
+ }
- this.dirty = new HashSet<K>();
- this.removed = new HashSet<K>();
- this.maxDirty = 100; // TODO: this needs to be configurable
- this.maxRemoved = 100; // TODO: this needs to be configurable
+ public MeteredKeyValueStore<K, V> disableLogging() {
+ loggingEnabled = false;
+ return this;
}
@Override
@@ -80,7 +67,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
@Override
public void init(ProcessorContext context) {
- String name = name();
+ final String name = name();
this.metrics = context.metrics();
this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
@@ -92,8 +79,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
serialization.init(context);
- this.context = context;
- this.partition = context.id().partition;
+ this.changeLogger = this.loggingEnabled ? new KeyValueStoreChangeLogger<>(name, context, serialization) : null;
// register and possibly restore the state from the logs
long startNs = time.nanoseconds();
@@ -105,8 +91,8 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
context.register(this, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
- inner.put(keyDeserializer.deserialize(topic, key),
- valDeserializer.deserialize(topic, value));
+ inner.put(keyDeserializer.deserialize(name, key),
+ valDeserializer.deserialize(name, value));
}
});
} finally {
@@ -135,9 +121,10 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
try {
this.inner.put(key, value);
- this.dirty.add(key);
- this.removed.remove(key);
- maybeLogChange();
+ if (loggingEnabled) {
+ changeLogger.add(key);
+ changeLogger.maybeLogChange(this.inner);
+ }
} finally {
this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
@@ -149,13 +136,13 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
try {
this.inner.putAll(entries);
- for (Entry<K, V> entry : entries) {
- K key = entry.key();
- this.dirty.add(key);
- this.removed.remove(key);
+ if (loggingEnabled) {
+ for (Entry<K, V> entry : entries) {
+ K key = entry.key();
+ changeLogger.add(key);
+ }
+ changeLogger.maybeLogChange(this.inner);
}
-
- maybeLogChange();
} finally {
this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
}
@@ -167,9 +154,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
try {
V value = this.inner.delete(key);
- this.dirty.remove(key);
- this.removed.add(key);
- maybeLogChange();
+ removed(key);
return value;
} finally {
@@ -179,14 +164,15 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
/**
* Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
- * store other than {@link #delete(Object)}.
+ * store.
*
* @param key the key for the entry that the inner store removed
*/
protected void removed(K key) {
- this.dirty.remove(key);
- this.removed.add(key);
- maybeLogChange();
+ if (loggingEnabled) {
+ changeLogger.delete(key);
+ changeLogger.maybeLogChange(this.inner);
+ }
}
@Override
@@ -209,35 +195,14 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
long startNs = time.nanoseconds();
try {
this.inner.flush();
- logChange();
+
+ if (loggingEnabled)
+ changeLogger.logChange(this.inner);
} finally {
this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
}
}
- private void maybeLogChange() {
- if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
- logChange();
- }
-
- private void logChange() {
- RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
- if (collector != null) {
- Serializer<K> keySerializer = serialization.keySerializer();
- Serializer<V> valueSerializer = serialization.valueSerializer();
-
- for (K k : this.removed) {
- collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
- }
- for (K k : this.dirty) {
- V v = this.inner.get(k);
- collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
- }
- this.removed.clear();
- this.dirty.clear();
- }
- }
-
private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
private final KeyValueIterator<K1, V1> iter;
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
index f1fbd9f..41314b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
@@ -17,25 +17,9 @@
package org.apache.kafka.streams.state;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
/**
* A {@link KeyValueStore} that stores all entries in a local RocksDB database.
@@ -43,7 +27,7 @@ import java.util.NoSuchElementException;
* @param <K> the type of keys
* @param <V> the type of values
*
- * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
+ * @see Stores#create(String)
*/
public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
@@ -62,239 +46,7 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
public StateStore get() {
- return new MeteredKeyValueStore<K, V>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
+ return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
}
- private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
-
- private static final int TTL_NOT_USED = -1;
-
- // TODO: these values should be configurable
- private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
- private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
- private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
- private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
- private static final long BLOCK_SIZE = 4096L;
- private static final int TTL_SECONDS = TTL_NOT_USED;
- private static final int MAX_WRITE_BUFFERS = 3;
- private static final String DB_FILE_DIR = "rocksdb";
-
- private final Serdes<K, V> serdes;
- private final String topic;
-
- private final Options options;
- private final WriteOptions wOptions;
- private final FlushOptions fOptions;
-
- private ProcessorContext context;
- private int partition;
- private String dbName;
- private String dirName;
- private RocksDB db;
-
- public RocksDBStore(String name, Serdes<K, V> serdes) {
- this.topic = name;
- this.serdes = serdes;
-
- // initialize the rocksdb options
- BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
- tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
- tableConfig.setBlockSize(BLOCK_SIZE);
-
- options = new Options();
- options.setTableFormatConfig(tableConfig);
- options.setWriteBufferSize(WRITE_BUFFER_SIZE);
- options.setCompressionType(COMPRESSION_TYPE);
- options.setCompactionStyle(COMPACTION_STYLE);
- options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
- options.setCreateIfMissing(true);
- options.setErrorIfExists(false);
-
- wOptions = new WriteOptions();
- wOptions.setDisableWAL(true);
-
- fOptions = new FlushOptions();
- fOptions.setWaitForFlush(true);
- }
-
- public void init(ProcessorContext context) {
- serdes.init(context);
-
- this.context = context;
- this.partition = context.id().partition;
- this.dbName = this.topic + "." + this.partition;
- this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
- this.db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
- }
-
- private RocksDB openDB(File dir, Options options, int ttl) {
- try {
- if (ttl == TTL_NOT_USED) {
- dir.getParentFile().mkdirs();
- return RocksDB.open(options, dir.toString());
- } else {
- throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
- // TODO: support TTL with change log?
- // return TtlDB.open(options, dir.toString(), ttl, false);
- }
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
- }
- }
-
- @Override
- public String name() {
- return this.topic;
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- try {
- return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
- }
- }
-
- @Override
- public void put(K key, V value) {
- try {
- if (value == null) {
- db.remove(wOptions, serdes.rawKey(key));
- } else {
- db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
- }
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
- }
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- V value = get(key);
- put(key, null);
- return value;
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- RocksIterator innerIter = db.newIterator();
- innerIter.seekToFirst();
- return new RocksDbIterator<K, V>(innerIter, serdes);
- }
-
- @Override
- public void flush() {
- try {
- db.flush(fOptions);
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing flush from store " + this.topic, e);
- }
- }
-
- @Override
- public void close() {
- flush();
- db.close();
- }
-
- private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
- private final RocksIterator iter;
- private final Serdes<K, V> serdes;
-
- public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
- this.iter = iter;
- this.serdes = serdes;
- }
-
- protected byte[] peekRawKey() {
- return iter.key();
- }
-
- protected Entry<K, V> getEntry() {
- return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
- }
-
- @Override
- public boolean hasNext() {
- return iter.isValid();
- }
-
- @Override
- public Entry<K, V> next() {
- if (!hasNext())
- throw new NoSuchElementException();
-
- Entry<K, V> entry = this.getEntry();
- iter.next();
- return entry;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("RocksDB iterator does not support remove");
- }
-
- @Override
- public void close() {
- }
-
- }
-
- private static class LexicographicComparator implements Comparator<byte[]> {
-
- @Override
- public int compare(byte[] left, byte[] right) {
- for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
- int leftByte = left[i] & 0xff;
- int rightByte = right[j] & 0xff;
- if (leftByte != rightByte) {
- return leftByte - rightByte;
- }
- }
- return left.length - right.length;
- }
- }
-
- private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
- // RocksDB's JNI interface does not expose getters/setters that allow the
- // comparator to be pluggable, and the default is lexicographic, so it's
- // safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator = new LexicographicComparator();
- byte[] rawToKey;
-
- public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
- K from, K to) {
- super(iter, serdes);
- iter.seek(serdes.rawKey(from));
- this.rawToKey = serdes.rawKey(to);
- }
-
- @Override
- public boolean hasNext() {
- return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
- }
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
new file mode 100644
index 0000000..40ca9f5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
+
+ private static final int TTL_NOT_USED = -1;
+
+ // TODO: these values should be configurable
+ private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
+ private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
+ private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
+ private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
+ private static final long BLOCK_SIZE = 4096L;
+ private static final int TTL_SECONDS = TTL_NOT_USED;
+ private static final int MAX_WRITE_BUFFERS = 3;
+ private static final String DB_FILE_DIR = "rocksdb";
+
+ private final String topic;
+
+ private final Options options;
+ private final WriteOptions wOptions;
+ private final FlushOptions fOptions;
+
+ private Serdes<K, V> serdes;
+ private ProcessorContext context;
+ private String dbName;
+ private String dirName;
+ private RocksDB db;
+
+ public RocksDBStore(String name, Serdes<K, V> serdes) {
+ this.topic = name;
+ this.serdes = serdes;
+
+ // initialize the rocksdb options
+ BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+ tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
+ tableConfig.setBlockSize(BLOCK_SIZE);
+
+ options = new Options();
+ options.setTableFormatConfig(tableConfig);
+ options.setWriteBufferSize(WRITE_BUFFER_SIZE);
+ options.setCompressionType(COMPRESSION_TYPE);
+ options.setCompactionStyle(COMPACTION_STYLE);
+ options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
+ options.setCreateIfMissing(true);
+ options.setErrorIfExists(false);
+
+ wOptions = new WriteOptions();
+ wOptions.setDisableWAL(true);
+
+ fOptions = new FlushOptions();
+ fOptions.setWaitForFlush(true);
+ }
+
+ public void init(ProcessorContext context) {
+ serdes.init(context);
+
+ this.context = context;
+ this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
+ this.db = openDB(new File(this.dirName, this.topic), this.options, TTL_SECONDS);
+ }
+
+ private RocksDB openDB(File dir, Options options, int ttl) {
+ try {
+ if (ttl == TTL_NOT_USED) {
+ dir.getParentFile().mkdirs();
+ return RocksDB.open(options, dir.toString());
+ } else {
+ throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
+ // TODO: support TTL with change log?
+ // return TtlDB.open(options, dir.toString(), ttl, false);
+ }
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
+ }
+ }
+
+ @Override
+ public String name() {
+ return this.topic;
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ try {
+ return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
+ }
+ }
+
+ @Override
+ public void put(K key, V value) {
+ try {
+ if (value == null) {
+ db.remove(wOptions, serdes.rawKey(key));
+ } else {
+ db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
+ }
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
+ }
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ V value = get(key);
+ put(key, null);
+ return value;
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ RocksIterator innerIter = db.newIterator();
+ innerIter.seekToFirst();
+ return new RocksDbIterator<K, V>(innerIter, serdes);
+ }
+
+ @Override
+ public void flush() {
+ try {
+ db.flush(fOptions);
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error while executing flush from store " + this.topic, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ flush();
+ db.close();
+ }
+
+ private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
+ private final RocksIterator iter;
+ private final Serdes<K, V> serdes;
+
+ public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
+ this.iter = iter;
+ this.serdes = serdes;
+ }
+
+ protected byte[] peekRawKey() {
+ return iter.key();
+ }
+
+ protected Entry<K, V> getEntry() {
+ return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.isValid();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ Entry<K, V> entry = this.getEntry();
+ iter.next();
+ return entry;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("RocksDB iterator does not support remove");
+ }
+
+ @Override
+ public void close() {
+ }
+
+ }
+
+ private static class LexicographicComparator implements Comparator<byte[]> {
+
+ @Override
+ public int compare(byte[] left, byte[] right) {
+ for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
+ int leftByte = left[i] & 0xff;
+ int rightByte = right[j] & 0xff;
+ if (leftByte != rightByte) {
+ return leftByte - rightByte;
+ }
+ }
+ return left.length - right.length;
+ }
+ }
+
+ private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
+ // RocksDB's JNI interface does not expose getters/setters that allow the
+ // comparator to be pluggable, and the default is lexicographic, so it's
+ // safe to just force lexicographic comparator here for now.
+ private final Comparator<byte[]> comparator = new LexicographicComparator();
+ byte[] rawToKey;
+
+ public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
+ K from, K to) {
+ super(iter, serdes);
+ iter.seek(serdes.rawKey(from));
+ this.rawToKey = serdes.rawKey(to);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
index f41d928..4e1b05a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.processor.ProcessorContext;
-final class Serdes<K, V> {
+public final class Serdes<K, V> {
public static <K, V> Serdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) {
Serializer<K> keySerializer = serializer(keyClass);
@@ -73,6 +73,7 @@ final class Serdes<K, V> {
* @param valueSerializer the serializer for values; may be null
* @param valueDeserializer the deserializer for values; may be null
*/
+ @SuppressWarnings("unchecked")
public Serdes(String topic,
Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
new file mode 100644
index 0000000..590995b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableFilterTest {
+
+ private final Serializer<String> strSerializer = new StringSerializer();
+ private final Deserializer<String> strDeserializer = new StringDeserializer();
+ private final Serializer<Integer> intSerializer = new IntegerSerializer();
+ private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
+
+ @Test
+ public void testKTable() {
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+
+ KTable<String, Integer> table1 = builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+
+ KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+
+ MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+ driver.process(topic1, "A", 1);
+ driver.process(topic1, "B", 2);
+ driver.process(topic1, "C", 3);
+ driver.process(topic1, "D", 4);
+ driver.process(topic1, "A", null);
+ driver.process(topic1, "B", null);
+
+
+ assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4", "A:null", "B:null"), proc2.processed);
+ }
+
+ @Test
+ public void testValueGetter() throws IOException {
+ File stateDir = Files.createTempDirectory("test").toFile();
+ try {
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+
+ KTableImpl<String, Integer, Integer> table1 =
+ (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+ KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+
+ KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+ KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+ getter2.init(driver.context());
+
+ driver.process(topic1, "A", 1);
+ driver.process(topic1, "B", 1);
+ driver.process(topic1, "C", 1);
+
+ assertNull(getter2.get("A"));
+ assertNull(getter2.get("B"));
+ assertNull(getter2.get("C"));
+
+ driver.process(topic1, "A", 2);
+ driver.process(topic1, "B", 2);
+
+ assertEquals(new Integer(2), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertNull(getter2.get("C"));
+
+ driver.process(topic1, "A", 3);
+
+ assertNull(getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertNull(getter2.get("C"));
+
+ driver.process(topic1, "A", null);
+ driver.process(topic1, "B", null);
+
+ assertNull(getter2.get("A"));
+ assertNull(getter2.get("B"));
+ assertNull(getter2.get("C"));
+
+ } finally {
+ Utils.delete(stateDir);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
new file mode 100644
index 0000000..56c5703
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableImplTest {
+
+ @Test
+ public void testKTable() {
+ final Serializer<String> serializer = new StringSerializer();
+ final Deserializer<String> deserializer = new StringDeserializer();
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ KTable<String, String> table1 = builder.table(serializer, serializer, deserializer, deserializer, topic1);
+
+ MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
+ table1.toStream().process(proc1);
+
+ KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+
+ MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
+
+ KTable<String, Integer> table3 = table2.filter(new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+
+ MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
+ table3.toStream().process(proc3);
+
+ KTable<String, String> table4 = table1.through(topic2, serializer, serializer, deserializer, deserializer);
+
+ MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
+ table4.toStream().process(proc4);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "02");
+ driver.process(topic1, "C", "03");
+ driver.process(topic1, "D", "04");
+
+ assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc1.processed);
+ assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
+ assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4"), proc3.processed);
+ assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc4.processed);
+ }
+
+ @Test
+ public void testValueGetter() throws IOException {
+ File stateDir = Files.createTempDirectory("test").toFile();
+ try {
+ final Serializer<String> serializer = new StringSerializer();
+ final Deserializer<String> deserializer = new StringDeserializer();
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+ KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+ KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+ table1.through(topic2, serializer, serializer, deserializer, deserializer);
+
+ KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+ KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+ KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+ KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+ KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+ getter1.init(driver.context());
+ KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+ getter2.init(driver.context());
+ KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+ getter3.init(driver.context());
+ KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+ getter4.init(driver.context());
+
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
+
+ assertEquals("01", getter1.get("A"));
+ assertEquals("01", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(1), getter2.get("A"));
+ assertEquals(new Integer(1), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("01", getter4.get("A"));
+ assertEquals("01", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
+
+ assertEquals("02", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(2), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertEquals(new Integer(2), getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("02", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", "03");
+
+ assertEquals("03", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(3), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("03", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", null);
+
+ assertNull(getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertNull(getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertNull(getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ } finally {
+ Utils.delete(stateDir);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
new file mode 100644
index 0000000..1ca6643
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesImplTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableMapValuesImplTest {
+
+ private final Serializer<String> strSerializer = new StringSerializer();
+ private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+ @Test
+ public void testKTable() {
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+
+ KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+ KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+
+ MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "02");
+ driver.process(topic1, "C", "03");
+ driver.process(topic1, "D", "04");
+
+ assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
+ }
+
+ @Test
+ public void testValueGetter() throws IOException {
+ File stateDir = Files.createTempDirectory("test").toFile();
+ try {
+ final Serializer<String> serializer = new StringSerializer();
+ final Deserializer<String> deserializer = new StringDeserializer();
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+ new ValueMapper<String, Integer>() {
+ @Override
+ public Integer apply(String value) {
+ return new Integer(value);
+ }
+ });
+ KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+ new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+ KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+ table1.through(topic2, serializer, serializer, deserializer, deserializer);
+
+ KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+ KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+ KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+ KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+ KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+ getter1.init(driver.context());
+ KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+ getter2.init(driver.context());
+ KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+ getter3.init(driver.context());
+ KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+ getter4.init(driver.context());
+
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
+
+ assertEquals("01", getter1.get("A"));
+ assertEquals("01", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(1), getter2.get("A"));
+ assertEquals(new Integer(1), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("01", getter4.get("A"));
+ assertEquals("01", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
+
+ assertEquals("02", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(2), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertEquals(new Integer(2), getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("02", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", "03");
+
+ assertEquals("03", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(3), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("03", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.process(topic1, "A", null);
+
+ assertNull(getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertNull(getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertNull(getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ } finally {
+ Utils.delete(stateDir);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
new file mode 100644
index 0000000..97aca3d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KTableSourceTest {
+
+ private final Serializer<String> strSerializer = new StringSerializer();
+ private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+ @Test
+ public void testKTable() {
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+
+ KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+
+ MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
+ table1.toStream().process(proc1);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+ driver.process(topic1, "A", 1);
+ driver.process(topic1, "B", 2);
+ driver.process(topic1, "C", 3);
+ driver.process(topic1, "D", 4);
+ driver.process(topic1, "A", null);
+ driver.process(topic1, "B", null);
+
+ assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), proc1.processed);
+ }
+
+ @Test
+ public void testValueGetter() throws IOException {
+ File stateDir = Files.createTempDirectory("test").toFile();
+ try {
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String topic1 = "topic1";
+
+ KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>)
+ builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+
+ KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+
+ KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+ getter1.init(driver.context());
+
+ driver.process(topic1, "A", "01");
+ driver.process(topic1, "B", "01");
+ driver.process(topic1, "C", "01");
+
+ assertEquals("01", getter1.get("A"));
+ assertEquals("01", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ driver.process(topic1, "A", "02");
+ driver.process(topic1, "B", "02");
+
+ assertEquals("02", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ driver.process(topic1, "A", "03");
+
+ assertEquals("03", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ driver.process(topic1, "A", null);
+ driver.process(topic1, "B", null);
+
+ assertNull(getter1.get("A"));
+ assertNull(getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ } finally {
+ Utils.delete(stateDir);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
index aa484fc..43ffa7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
@@ -61,6 +61,8 @@ public class KafkaStreamingPartitionAssignorTest {
private TopicPartition t2p2 = new TopicPartition("topic2", 2);
private TopicPartition t2p3 = new TopicPartition("topic2", 3);
+ private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
+
private List<PartitionInfo> infos = Arrays.asList(
new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
@@ -203,47 +205,26 @@ public class KafkaStreamingPartitionAssignorTest {
assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
// check assignment info
- Set<TaskId> allActiveTasks = new HashSet<>();
- AssignmentInfo info;
-
- List<TaskId> activeTasks = new ArrayList<>();
- for (TopicPartition partition : assignments.get("consumer10").partitions()) {
- activeTasks.add(new TaskId(0, partition.partition()));
- }
- info = AssignmentInfo.decode(assignments.get("consumer10").userData());
- assertEquals(activeTasks, info.activeTasks);
- assertEquals(2, info.activeTasks.size());
- assertEquals(1, new HashSet<>(info.activeTasks).size());
- assertEquals(0, info.standbyTasks.size());
- allActiveTasks.addAll(info.activeTasks);
+ Set<TaskId> allActiveTasks = new HashSet<>();
- activeTasks.clear();
- for (TopicPartition partition : assignments.get("consumer11").partitions()) {
- activeTasks.add(new TaskId(0, partition.partition()));
- }
- info = AssignmentInfo.decode(assignments.get("consumer11").userData());
- assertEquals(activeTasks, info.activeTasks);
- assertEquals(2, info.activeTasks.size());
- assertEquals(1, new HashSet<>(info.activeTasks).size());
- assertEquals(0, info.standbyTasks.size());
+ // the first consumer
+ AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+ allActiveTasks.addAll(info10.activeTasks);
- allActiveTasks.addAll(info.activeTasks);
+ // the second consumer
+ AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+ allActiveTasks.addAll(info11.activeTasks);
// check active tasks assigned to the first client
assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
- activeTasks.clear();
- for (TopicPartition partition : assignments.get("consumer20").partitions()) {
- activeTasks.add(new TaskId(0, partition.partition()));
- }
- info = AssignmentInfo.decode(assignments.get("consumer20").userData());
- assertEquals(activeTasks, info.activeTasks);
- assertEquals(2, info.activeTasks.size());
- assertEquals(1, new HashSet<>(info.activeTasks).size());
- assertEquals(0, info.standbyTasks.size());
+ // the third consumer
+ AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+ allActiveTasks.addAll(info20.activeTasks);
- allActiveTasks.addAll(info.activeTasks);
+ assertEquals(3, allActiveTasks.size());
+ assertEquals(allTasks, new HashSet<>(allActiveTasks));
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -266,6 +247,7 @@ public class KafkaStreamingPartitionAssignorTest {
List<String> topics = Utils.mkList("topic1", "topic2");
Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
@@ -291,55 +273,29 @@ public class KafkaStreamingPartitionAssignorTest {
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
- // check assigned partitions
-
- assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
- Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
- assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
-
- // check assignment info
Set<TaskId> allActiveTasks = new HashSet<>();
Set<TaskId> allStandbyTasks = new HashSet<>();
- AssignmentInfo info;
- List<TaskId> activeTasks = new ArrayList<>();
- for (TopicPartition partition : assignments.get("consumer10").partitions()) {
- activeTasks.add(new TaskId(0, partition.partition()));
- }
- info = AssignmentInfo.decode(assignments.get("consumer10").userData());
- assertEquals(activeTasks, info.activeTasks);
- assertEquals(2, info.activeTasks.size());
- assertEquals(1, new HashSet<>(info.activeTasks).size());
+ // the first consumer
+ AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+ allActiveTasks.addAll(info10.activeTasks);
+ allStandbyTasks.addAll(info10.standbyTasks.keySet());
- allActiveTasks.addAll(info.activeTasks);
- allStandbyTasks.addAll(info.standbyTasks);
+ // the second consumer
+ AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+ allActiveTasks.addAll(info11.activeTasks);
+ allStandbyTasks.addAll(info11.standbyTasks.keySet());
- activeTasks.clear();
- for (TopicPartition partition : assignments.get("consumer11").partitions()) {
- activeTasks.add(new TaskId(0, partition.partition()));
- }
- info = AssignmentInfo.decode(assignments.get("consumer11").userData());
- assertEquals(activeTasks, info.activeTasks);
- assertEquals(2, info.activeTasks.size());
- assertEquals(1, new HashSet<>(info.activeTasks).size());
-
- allActiveTasks.addAll(info.activeTasks);
- allStandbyTasks.addAll(info.standbyTasks);
-
- // check tasks assigned to the first client
+ // check active tasks assigned to the first client
assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
+ assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
- activeTasks.clear();
- for (TopicPartition partition : assignments.get("consumer20").partitions()) {
- activeTasks.add(new TaskId(0, partition.partition()));
- }
- info = AssignmentInfo.decode(assignments.get("consumer20").userData());
- assertEquals(activeTasks, info.activeTasks);
- assertEquals(2, info.activeTasks.size());
- assertEquals(1, new HashSet<>(info.activeTasks).size());
+ // the third consumer
+ AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+ allActiveTasks.addAll(info20.activeTasks);
+ allStandbyTasks.addAll(info20.standbyTasks.keySet());
- allActiveTasks.addAll(info.activeTasks);
- allStandbyTasks.addAll(info.standbyTasks);
+ // all task ids are in the active tasks and also in the standby tasks
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -348,6 +304,48 @@ public class KafkaStreamingPartitionAssignorTest {
assertEquals(allTasks, new HashSet<>(allStandbyTasks));
}
+ private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
+
+ // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
+
+ AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+ // check if the number of assigned partitions == the size of active task id list
+ assertEquals(assignment.partitions().size(), info.activeTasks.size());
+
+ // check if active tasks are consistent
+ List<TaskId> activeTasks = new ArrayList<>();
+ Set<String> activeTopics = new HashSet<>();
+ for (TopicPartition partition : assignment.partitions()) {
+ // since default grouper, taskid.partition == partition.partition()
+ activeTasks.add(new TaskId(0, partition.partition()));
+ activeTopics.add(partition.topic());
+ }
+ assertEquals(activeTasks, info.activeTasks);
+
+ // check if active partitions cover all topics
+ assertEquals(allTopics, activeTopics);
+
+ // check if standby tasks are consistent
+ Set<String> standbyTopics = new HashSet<>();
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
+ TaskId id = entry.getKey();
+ Set<TopicPartition> partitions = entry.getValue();
+ for (TopicPartition partition : partitions) {
+ // since default grouper, taskid.partition == partition.partition()
+ assertEquals(id.partition, partition.partition());
+
+ standbyTopics.add(partition.topic());
+ }
+ }
+
+ if (info.standbyTasks.size() > 0)
+ // check if standby partitions cover all topics
+ assertEquals(allTopics, standbyTopics);
+
+ return info;
+ }
+
@Test
public void testOnAssignment() throws Exception {
StreamingConfig config = new StreamingConfig(configProps());
@@ -369,7 +367,10 @@ public class KafkaStreamingPartitionAssignorTest {
partitionAssignor.configure(config.getConsumerConfigs(thread));
List<TaskId> activeTaskList = Utils.mkList(task0, task3);
- Set<TaskId> standbyTasks = Utils.mkSet(task1, task2);
+ Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+ standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
+ standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
+
AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
partitionAssignor.onAssignment(assignment);
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b8a6990..9a43e46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -18,6 +18,8 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -41,6 +43,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -53,6 +56,7 @@ public class StandbyTaskTest {
private final TopicPartition partition1 = new TopicPartition("store1", 1);
private final TopicPartition partition2 = new TopicPartition("store2", 1);
+ private final Set<TopicPartition> topicPartitions = Collections.emptySet();
private final ProcessorTopology topology = new ProcessorTopology(
Collections.<ProcessorNode>emptyList(),
Collections.<String, SourceNode>emptyMap(),
@@ -78,6 +82,7 @@ public class StandbyTaskTest {
});
}
+ private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final ProcessorStateManagerTest.MockRestoreConsumer restoreStateConsumer = new ProcessorStateManagerTest.MockRestoreConsumer();
private final byte[] recordValue = intSerializer.serialize(null, 10);
@@ -104,7 +109,7 @@ public class StandbyTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
- StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+ StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
@@ -119,7 +124,7 @@ public class StandbyTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
- StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+ StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
@@ -138,7 +143,7 @@ public class StandbyTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
- StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+ StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a95c2fa..aae5a7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -103,7 +103,7 @@ public class StreamTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
- StreamTask task = new StreamTask(new TaskId(0, 0), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
+ StreamTask task = new StreamTask(new TaskId(0, 0), partitions, topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
@@ -154,7 +154,7 @@ public class StreamTaskTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
- StreamTask task = new StreamTask(new TaskId(1, 1), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
+ StreamTask task = new StreamTask(new TaskId(1, 1), partitions, topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 02d0ac7..9f31450 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -126,13 +126,13 @@ public class StreamThreadTest {
public boolean committed = false;
public TestStreamTask(TaskId id,
+ Collection<TopicPartition> partitions,
+ ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
- Collection<TopicPartition> partitions,
- ProcessorTopology topology,
StreamingConfig config) {
- super(id, consumer, producer, restoreConsumer, partitions, topology, config, null);
+ super(id, partitions, topology, consumer, producer, restoreConsumer, config, null);
}
@Override
@@ -163,7 +163,7 @@ public class StreamThreadTest {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build(id.topicGroupId);
- return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
+ return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
}
};
@@ -288,7 +288,7 @@ public class StreamThreadTest {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build(id.topicGroupId);
- return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
+ return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
}
};
@@ -410,7 +410,7 @@ public class StreamThreadTest {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build(id.topicGroupId);
- return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, topology, config);
+ return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
}
};
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
index 58e0af9..14a7f9a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
@@ -17,12 +17,15 @@
package org.apache.kafka.streams.processor.internals.assignment;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.junit.Test;
import java.util.Arrays;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -33,8 +36,10 @@ public class AssginmentInfoTest {
public void testEncodeDecode() {
List<TaskId> activeTasks =
Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
- Set<TaskId> standbyTasks =
- new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
+ Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+ standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
+ standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks);
AssignmentInfo decoded = AssignmentInfo.decode(info.encode());
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index ca5f33d..119f08f 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -17,28 +17,44 @@
package org.apache.kafka.test;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import java.io.File;
import java.util.List;
public class KStreamTestDriver {
private final ProcessorTopology topology;
private final MockProcessorContext context;
+ public final File stateDir;
+
private ProcessorNode currNode;
public KStreamTestDriver(KStreamBuilder builder) {
- this(builder, null, null);
+ this(builder, null, null, null, null, null);
}
- public KStreamTestDriver(KStreamBuilder builder, Serializer<?> serializer, Deserializer<?> deserializer) {
+ public KStreamTestDriver(KStreamBuilder builder,
+ File stateDir,
+ Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
+ Serializer<?> valSerializer, Deserializer<?> valDeserializer) {
this.topology = builder.build(null);
- this.context = new MockProcessorContext(this, serializer, deserializer);
+ this.context = new MockProcessorContext(this, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector());
+ this.stateDir = stateDir;
+
+ for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) {
+ StateStore store = stateStoreSupplier.get();
+ store.init(context);
+ }
for (ProcessorNode node : topology.processors()) {
currNode = node;
@@ -50,6 +66,10 @@ public class KStreamTestDriver {
}
}
+ public ProcessorContext context() {
+ return context;
+ }
+
public void process(String topicName, Object key, Object value) {
currNode = topology.source(topicName);
try {
@@ -92,4 +112,21 @@ public class KStreamTestDriver {
}
}
+ private class MockRecordCollector extends RecordCollector {
+ public MockRecordCollector() {
+ super(null);
+ }
+
+ public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+ // The serialization is skipped.
+ process(record.topic(), record.key(), record.value());
+ }
+
+ public void flush() {
+ }
+
+ public void close() {
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 40f11a0..81a9add 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -17,6 +17,7 @@
package org.apache.kafka.test;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -109,17 +110,24 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
@Override
public File stateDir() {
- throw new UnsupportedOperationException("stateDir() not supported.");
+ return driver.stateDir;
}
@Override
public StreamingMetrics metrics() {
- throw new UnsupportedOperationException("metrics() not supported.");
+ return new StreamingMetrics() {
+ @Override
+ public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
+ return null;
+ }
+ @Override
+ public void recordLatency(Sensor sensor, long startNs, long endNs) {
+ }
+ };
}
@Override
public void register(StateStore store, StateRestoreCallback func) {
- if (func != null) throw new UnsupportedOperationException("StateRestoreCallback not supported.");
storeMap.put(store.name(), store);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 5f796c6..fdb4d57 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -158,11 +158,11 @@ public class ProcessorTopologyTestDriver {
}
task = new StreamTask(id,
+ partitionsByTopic.values(),
+ topology,
consumer,
producer,
restoreStateConsumer,
- partitionsByTopic.values(),
- topology,
config,
new StreamingMetrics() {
@Override
[2/2] kafka git commit: KAFKA-2856: Add KTable non-stateful APIs
along with standby task support
Posted by gu...@apache.org.
KAFKA-2856: Add KTable non-stateful APIs along with standby task support
guozhangwang
* added KTable API and impl
* added standby support for KTable
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #604 from ymatsuda/add_ktable
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39c3512e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39c3512e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39c3512e
Branch: refs/heads/trunk
Commit: 39c3512eceedebcb6e50f8c6c4ef66601ff7dbc4
Parents: cd54fc8
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Fri Dec 4 14:59:24 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Dec 4 14:59:24 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/KStream.java | 22 +-
.../kafka/streams/kstream/KStreamBuilder.java | 41 +++
.../apache/kafka/streams/kstream/KTable.java | 108 ++++++++
.../streams/kstream/internals/KStreamImpl.java | 2 +-
.../KTableDerivedValueGetterSupplier.java | 28 ++
.../streams/kstream/internals/KTableFilter.java | 87 ++++++
.../streams/kstream/internals/KTableImpl.java | 188 +++++++++++++
.../kstream/internals/KTableMapValues.java | 85 ++++++
.../internals/KTableProcessorSupplier.java | 26 ++
.../streams/kstream/internals/KTableSource.java | 78 ++++++
.../KTableSourceValueGetterSupplier.java | 50 ++++
.../kstream/internals/KTableStoreSupplier.java | 58 ++++
.../kstream/internals/KTableValueGetter.java | 28 ++
.../internals/KTableValueGetterSupplier.java | 24 ++
.../streams/processor/PartitionGrouper.java | 2 +-
.../apache/kafka/streams/processor/TaskId.java | 12 +
.../processor/internals/AbstractTask.java | 14 +-
.../KafkaStreamingPartitionAssignor.java | 20 +-
.../internals/ProcessorStateManager.java | 49 +++-
.../processor/internals/StandbyTask.java | 25 +-
.../streams/processor/internals/StreamTask.java | 18 +-
.../processor/internals/StreamThread.java | 78 ++++--
.../internals/assignment/AssignmentInfo.java | 129 +++++----
.../state/KeyValueStoreChangeLogger.java | 87 ++++++
.../streams/state/MeteredKeyValueStore.java | 95 +++----
.../state/RocksDBKeyValueStoreSupplier.java | 252 +-----------------
.../kafka/streams/state/RocksDBStore.java | 265 +++++++++++++++++++
.../org/apache/kafka/streams/state/Serdes.java | 3 +-
.../kstream/internals/KTableFilterTest.java | 137 ++++++++++
.../kstream/internals/KTableImplTest.java | 220 +++++++++++++++
.../internals/KTableMapValuesImplTest.java | 198 ++++++++++++++
.../kstream/internals/KTableSourceTest.java | 117 ++++++++
.../KafkaStreamingPartitionAssignorTest.java | 151 +++++------
.../processor/internals/StandbyTaskTest.java | 11 +-
.../processor/internals/StreamTaskTest.java | 4 +-
.../processor/internals/StreamThreadTest.java | 12 +-
.../assignment/AssginmentInfoTest.java | 11 +-
.../apache/kafka/test/KStreamTestDriver.java | 43 ++-
.../apache/kafka/test/MockProcessorContext.java | 14 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 4 +-
40 files changed, 2268 insertions(+), 528 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 992bd75..93303eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -33,7 +33,7 @@ public interface KStream<K, V> {
* Creates a new instance of KStream consists of all elements of this stream which satisfy a predicate
*
* @param predicate the instance of Predicate
- * @return the stream with only those elements that satisfy the predicate
+ * @return the instance of KStream with only those elements that satisfy the predicate
*/
KStream<K, V> filter(Predicate<K, V> predicate);
@@ -41,22 +41,22 @@ public interface KStream<K, V> {
* Creates a new instance of KStream consists all elements of this stream which do not satisfy a predicate
*
* @param predicate the instance of Predicate
- * @return the stream with only those elements that do not satisfy the predicate
+ * @return the instance of KStream with only those elements that do not satisfy the predicate
*/
KStream<K, V> filterOut(Predicate<K, V> predicate);
/**
- * Creates a new stream by applying transforming each element in this stream into a different element in the new stream.
+ * Creates a new instance of KStream by applying transforming each element in this stream into a different element in the new stream.
*
* @param mapper the instance of KeyValueMapper
* @param <K1> the key type of the new stream
* @param <V1> the value type of the new stream
- * @return the mapped stream
+ * @return the instance of KStream
*/
<K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);
/**
- * Creates a new instance of KStream by applying transforming each value in this stream into a different value in the new stream.
+ * Creates a new instance of KStream by transforming each value in this stream into a different value in the new stream.
*
* @param mapper the instance of ValueMapper
* @param <V1> the value type of the new stream
@@ -65,7 +65,7 @@ public interface KStream<K, V> {
<V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
/**
- * Creates a new instance of KStream by applying transforming each element in this stream into zero or more elements in the new stream.
+ * Creates a new instance of KStream by transforming each element in this stream into zero or more elements in the new stream.
*
* @param mapper the instance of KeyValueMapper
* @param <K1> the key type of the new stream
@@ -75,7 +75,7 @@ public interface KStream<K, V> {
<K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);
/**
- * Creates a new instance of KStream by applying transforming each value in this stream into zero or more values in the new stream.
+ * Creates a new stream by transforming each value in this stream into zero or more values in the new stream.
*
* @param processor the instance of Processor
* @param <V1> the value type of the new stream
@@ -103,11 +103,11 @@ public interface KStream<K, V> {
KStream<K, V>[] branch(Predicate<K, V>... predicates);
/**
- * Sends key-value to a topic, also creates a new stream from the topic.
+ * Sends key-value to a topic, also creates a new instance of KStream from the topic.
* This is equivalent to calling to(topic) and from(topic).
*
* @param topic the topic name
- * @return the new stream that consumes the given topic
+ * @return the instance of KStream that consumes the given topic
*/
KStream<K, V> through(String topic);
@@ -124,7 +124,7 @@ public interface KStream<K, V> {
* if not specified the default key deserializer defined in the configuration will be used
* @param valDeserializer value deserializer used to create the new KStream,
* if not specified the default value deserializer defined in the configuration will be used
- * @return the new stream that consumes the given topic
+ * @return the instance of KStream that consumes the given topic
*/
KStream<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer);
@@ -160,7 +160,7 @@ public interface KStream<K, V> {
*
* @param valueTransformerSupplier the class of TransformerDef
* @param stateStoreNames the names of the state store used by the processor
- * @return the instance of KStream that contains transformed keys and values
+ * @return the instance of KStream that contains the keys and transformed values
*/
<R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index ae8f694..ca1a10d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -18,7 +18,11 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
+import org.apache.kafka.streams.kstream.internals.KTableImpl;
+import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.processor.TopologyBuilder;
import java.util.Collections;
@@ -65,6 +69,43 @@ public class KStreamBuilder extends TopologyBuilder {
}
/**
+ * Creates a KTable instance for the specified topic.
+ * The default deserializers specified in the config are used.
+ *
+ * @param topic the topic name
+ * @return KTable
+ */
+ public <K, V> KTable<K, V> table(String topic) {
+ return table(null, null, null, null, topic);
+ }
+
+ /**
+ * Creates a KTable instance for the specified topic.
+ *
+ * @param keySerializer key serializer used to send key-value pairs,
+ * if not specified the default key serializer defined in the configuration will be used
+ * @param valSerializer value serializer used to send key-value pairs,
+ * if not specified the default value serializer defined in the configuration will be used
+ * @param keyDeserializer key deserializer used to read this source KStream,
+ * if not specified the default deserializer defined in the configs will be used
+ * @param valDeserializer value deserializer used to read this source KStream,
+ * if not specified the default deserializer defined in the configs will be used
+ * @param topic the topic name
+ * @return KStream
+ */
+ public <K, V> KTable<K, V> table(Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String topic) {
+ String source = newName(KStreamImpl.SOURCE_NAME);
+ String name = newName(KTableImpl.SOURCE_NAME);
+
+ addSource(source, keyDeserializer, valDeserializer, topic);
+
+ KTableProcessorSupplier<K, V, V> processorSupplier = new KTableSource<>(topic);
+ addProcessor(name, processorSupplier, source);
+
+ return new KTableImpl<>(this, name, processorSupplier, source, topic, keySerializer, valSerializer, keyDeserializer, valDeserializer);
+ }
+
+ /**
* Creates a new stream by merging the given streams
*
* @param streams the streams to be merged
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
new file mode 100644
index 0000000..75fb87a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * KTable is an abstraction of a change log stream.
+ *
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ */
+public interface KTable<K, V> {
+
+ /**
+ * Creates a new instance of KTable consists of all elements of this stream which satisfy a predicate
+ *
+ * @param predicate the instance of Predicate
+ * @return the instance of KTable with only those elements that satisfy the predicate
+ */
+ KTable<K, V> filter(Predicate<K, V> predicate);
+
+ /**
+ * Creates a new instance of KTable consists all elements of this stream which do not satisfy a predicate
+ *
+ * @param predicate the instance of Predicate
+ * @return the instance of KTable with only those elements that do not satisfy the predicate
+ */
+ KTable<K, V> filterOut(Predicate<K, V> predicate);
+
+ /**
+ * Creates a new instance of KTable by transforming each value in this stream into a different value in the new stream.
+ *
+ * @param mapper the instance of ValueMapper
+ * @param <V1> the value type of the new stream
+ * @return the instance of KTable
+ */
+ <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper);
+
+ /**
+ * Sends key-value to a topic, also creates a new instance of KTable from the topic.
+ * This is equivalent to calling to(topic) and table(topic).
+ *
+ * @param topic the topic name
+ * @return the instance of KTable that consumes the given topic
+ */
+ KTable<K, V> through(String topic);
+
+ /**
+ * Sends key-value to a topic, also creates a new instance of KTable from the topic.
+ * This is equivalent to calling to(topic) and table(topic).
+ *
+ * @param topic the topic name
+ * @param keySerializer key serializer used to send key-value pairs,
+ * if not specified the default key serializer defined in the configuration will be used
+ * @param valSerializer value serializer used to send key-value pairs,
+ * if not specified the default value serializer defined in the configuration will be used
+ * @param keyDeserializer key deserializer used to create the new KStream,
+ * if not specified the default key deserializer defined in the configuration will be used
+ * @param valDeserializer value deserializer used to create the new KStream,
+ * if not specified the default value deserializer defined in the configuration will be used
+ * @return the new stream that consumes the given topic
+ */
+ KTable<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer);
+
+ /**
+ * Sends key-value to a topic using default serializers specified in the config.
+ *
+ * @param topic the topic name
+ */
+ void to(String topic);
+
+ /**
+ * Sends key-value to a topic.
+ *
+ * @param topic the topic name
+ * @param keySerializer key serializer used to send key-value pairs,
+ * if not specified the default serializer defined in the configs will be used
+ * @param valSerializer value serializer used to send key-value pairs,
+ * if not specified the default serializer defined in the configs will be used
+ */
+ void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer);
+
+ /**
+ * Creates a new instance of KStream from this KTable
+ *
+ * @return the instance of KStream
+ */
+ KStream<K, V> toStream();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 04aa8e9..fc8f4c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -59,7 +59,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
- private static final String SINK_NAME = "KSTREAM-SINK-";
+ public static final String SINK_NAME = "KSTREAM-SINK-";
public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
new file mode 100644
index 0000000..731d7f7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+public abstract class KTableDerivedValueGetterSupplier<K, V1, V2> implements KTableValueGetterSupplier<K, V2> {
+
+ protected final KTableValueGetterSupplier<K, V1> parentValueGetterSupplier;
+
+ public KTableDerivedValueGetterSupplier(KTableValueGetterSupplier<K, V1> parentValueGetterSupplier) {
+ this.parentValueGetterSupplier = parentValueGetterSupplier;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
new file mode 100644
index 0000000..212b1c9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableFilter<K, V> extends KTableProcessorSupplier<K, V, V> {
+
+ private final Predicate<K, V> predicate;
+ private final boolean filterOut;
+
+ public KTableFilter(Predicate<K, V> predicate, boolean filterOut) {
+ this.predicate = predicate;
+ this.filterOut = filterOut;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KTableFilterProcessor();
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K, V> view(KTableValueGetterSupplier<K, V> parentValueGetterSupplier) {
+ return new KTableDerivedValueGetterSupplier<K, V, V>(parentValueGetterSupplier) {
+
+ public KTableValueGetter<K, V> get() {
+ return new KTableFilterValueGetter(parentValueGetterSupplier.get());
+ }
+
+ };
+ }
+
+ private V computeNewValue(K key, V value) {
+ V newValue = null;
+
+ if (value != null && (filterOut ^ predicate.test(key, value)))
+ newValue = value;
+
+ return newValue;
+ }
+
+ private class KTableFilterProcessor extends AbstractProcessor<K, V> {
+
+ @Override
+ public void process(K key, V value) {
+ context().forward(key, computeNewValue(key, value));
+ }
+
+ }
+
+ private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
+
+ private final KTableValueGetter<K, V> parentGetter;
+
+ public KTableFilterValueGetter(KTableValueGetter<K, V> parentGetter) {
+ this.parentGetter = parentGetter;
+ }
+
+ public void init(ProcessorContext context) {
+ parentGetter.init(context);
+ }
+
+ public V get(K key) {
+ return computeNewValue(key, parentGetter.get(key));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
new file mode 100644
index 0000000..5b2b031
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.Collections;
+
+/**
+ * The implementation class of KTable
+ * @param <K> the key type
+ * @param <S> the source's (parent's) value type
+ * @param <V> the value type
+ */
+public class KTableImpl<K, S, V> implements KTable<K, V> {
+
+ private static final String FILTER_NAME = "KTABLE-FILTER-";
+
+ private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
+
+ private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
+
+ public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+
+ protected final KStreamBuilder topology;
+ public final String name;
+ public final KTableProcessorSupplier<K, S, V> processorSupplier;
+ private final String sourceNode;
+
+ private final KTableImpl<K, ?, S> parent;
+ private final String topic;
+ private final Serializer<K> keySerializer;
+ private final Serializer<V> valSerializer;
+ private final Deserializer<K> keyDeserializer;
+ private final Deserializer<V> valDeserializer;
+
+ public KTableImpl(KStreamBuilder topology,
+ String name,
+ KTableProcessorSupplier<K, S, V> processorSupplier,
+ String sourceNode,
+ KTableImpl<K, ?, S> parent) {
+ this(topology, name, processorSupplier, sourceNode, null, null, null, null, null, parent);
+ }
+
+ public KTableImpl(KStreamBuilder topology,
+ String name,
+ KTableProcessorSupplier<K, S, V> processorSupplier,
+ String sourceNode,
+ String topic,
+ Serializer<K> keySerializer,
+ Serializer<V> valSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valDeserializer) {
+ this(topology, name, processorSupplier, sourceNode, topic, keySerializer, valSerializer, keyDeserializer, valDeserializer, null);
+ }
+
+ private KTableImpl(KStreamBuilder topology,
+ String name,
+ KTableProcessorSupplier<K, S, V> processorSupplier,
+ String sourceNode,
+ String topic,
+ Serializer<K> keySerializer,
+ Serializer<V> valSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valDeserializer,
+ KTableImpl<K, ?, S> parent) {
+ this.topology = topology;
+ this.name = name;
+ this.processorSupplier = processorSupplier;
+ this.sourceNode = sourceNode;
+ this.topic = topic;
+ this.keySerializer = keySerializer;
+ this.valSerializer = valSerializer;
+ this.keyDeserializer = keyDeserializer;
+ this.valDeserializer = valDeserializer;
+ this.parent = parent;
+ }
+
+ @Override
+ public KTable<K, V> filter(Predicate<K, V> predicate) {
+ String name = topology.newName(FILTER_NAME);
+ KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(predicate, false);
+ topology.addProcessor(name, processorSupplier, this.name);
+
+ return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+ }
+
+ @Override
+ public KTable<K, V> filterOut(final Predicate<K, V> predicate) {
+ String name = topology.newName(FILTER_NAME);
+ KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(predicate, true);
+
+ topology.addProcessor(name, processorSupplier, this.name);
+
+ return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+ }
+
+ @Override
+ public <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper) {
+ String name = topology.newName(MAPVALUES_NAME);
+ KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(mapper);
+
+ topology.addProcessor(name, processorSupplier, this.name);
+
+ return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+ }
+
+ @Override
+ public KTable<K, V> through(String topic,
+ Serializer<K> keySerializer,
+ Serializer<V> valSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valDeserializer) {
+ String sendName = topology.newName(KStreamImpl.SINK_NAME);
+
+ topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
+
+ return topology.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic);
+ }
+
+ @Override
+ public KTable<K, V> through(String topic) {
+ return through(topic, null, null, null, null);
+ }
+
+ @Override
+ public void to(String topic) {
+ String name = topology.newName(KStreamImpl.SINK_NAME);
+
+ topology.addSink(name, topic, this.name);
+ }
+
+ @Override
+ public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
+ String name = topology.newName(KStreamImpl.SINK_NAME);
+
+ topology.addSink(name, topic, keySerializer, valSerializer, this.name);
+ }
+
+ @Override
+ public KStream<K, V> toStream() {
+ String name = topology.newName(TOSTREAM_NAME);
+
+ topology.addProcessor(name, new KStreamPassThrough(), this.name);
+
+ return new KStreamImpl<>(topology, name, Collections.singleton(sourceNode));
+ }
+
+ KTableValueGetterSupplier<K, V> valueGetterSupplier() {
+ if (parent != null) {
+ return processorSupplier.view(parent.valueGetterSupplier());
+ } else {
+ KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
+ synchronized (source) {
+ if (!source.isMaterialized()) {
+ StateStoreSupplier storeSupplier =
+ new KTableStoreSupplier(topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
+ topology.addStateStore(storeSupplier, name);
+ source.materialize();
+ }
+ }
+ return new KTableSourceValueGetterSupplier<>(topic);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
new file mode 100644
index 0000000..0d14390
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableMapValues<K1, V1, V2> extends KTableProcessorSupplier<K1, V1, V2> {
+
+ private final ValueMapper<V1, V2> mapper;
+
+ public KTableMapValues(ValueMapper<V1, V2> mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public Processor<K1, V1> get() {
+ return new KTableMapProcessor();
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K1, V2> view(KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier) {
+ return new KTableDerivedValueGetterSupplier<K1, V1, V2>(parentValueGetterSupplier) {
+
+ public KTableValueGetter<K1, V2> get() {
+ return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
+ }
+
+ };
+ }
+
+ private V2 computeNewValue(V1 value) {
+ V2 newValue = null;
+
+ if (value != null)
+ newValue = mapper.apply(value);
+
+ return newValue;
+ }
+
+ private class KTableMapProcessor extends AbstractProcessor<K1, V1> {
+
+ @Override
+ public void process(K1 key, V1 value) {
+ context().forward(key, computeNewValue(value));
+ }
+
+ }
+
+ private class KTableMapValuesValueGetter implements KTableValueGetter<K1, V2> {
+
+ private final KTableValueGetter<K1, V1> parentGetter;
+
+ public KTableMapValuesValueGetter(KTableValueGetter<K1, V1> parentGetter) {
+ this.parentGetter = parentGetter;
+ }
+
+ public void init(ProcessorContext context) {
+ parentGetter.init(context);
+ }
+
+ public V2 get(K1 key) {
+ return computeNewValue(parentGetter.get(key));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
new file mode 100644
index 0000000..cc6467f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public abstract class KTableProcessorSupplier<K, V, T> implements ProcessorSupplier<K, V> {
+
+ public abstract KTableValueGetterSupplier<K, T> view(KTableValueGetterSupplier<K, V> parentValueGetterFactory);
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
new file mode 100644
index 0000000..93790ed
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableSource<K, V> extends KTableProcessorSupplier<K, V, V> {
+
+ private final String topic;
+
+ private boolean materialized = false;
+
+ public KTableSource(String topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return materialized ? new MaterializedKTableSourceProcessor() : new KTableSourceProcessor();
+ }
+
+ public void materialize() {
+ materialized = true;
+ }
+
+ public boolean isMaterialized() {
+ return materialized;
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K, V> view(KTableValueGetterSupplier<K, V> parentValueGetterSupplier) {
+ throw new IllegalStateException("a view cannot be define on the ktable source");
+ }
+
+ private class KTableSourceProcessor extends AbstractProcessor<K, V> {
+ @Override
+ public void process(K key, V value) {
+ context().forward(key, value);
+ }
+ }
+
+ private class MaterializedKTableSourceProcessor extends AbstractProcessor<K, V> {
+
+ private KeyValueStore<K, V> store;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+ store = (KeyValueStore<K, V>) context.getStateStore(topic);
+ }
+
+ @Override
+ public void process(K key, V value) {
+ store.put(key, value);
+ context().forward(key, value);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
new file mode 100644
index 0000000..dab92d5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
+
+ private final String topic;
+
+ public KTableSourceValueGetterSupplier(String topic) {
+ this.topic = topic;
+ }
+
+ public KTableValueGetter<K, V> get() {
+ return new KTableSourceValueGetter();
+ }
+
+ private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
+
+ KeyValueStore<K, V> store = null;
+
+ @SuppressWarnings("unchecked")
+ public void init(ProcessorContext context) {
+ store = (KeyValueStore<K, V>) context.getStateStore(topic);
+ }
+
+ public V get(K key) {
+ return store.get(key);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
new file mode 100644
index 0000000..d07fc5d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.MeteredKeyValueStore;
+import org.apache.kafka.streams.state.RocksDBStore;
+import org.apache.kafka.streams.state.Serdes;
+
+/**
+ * A KTable storage. It stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ */
+public class KTableStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final Serdes<K, V> serdes;
+ private final Time time;
+
+ protected KTableStoreSupplier(String name,
+ Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
+ Serializer<V> valSerializer, Deserializer<V> valDeserializer,
+ Time time) {
+ this.name = name;
+ this.serdes = new Serdes<>(name, keySerializer, keyDeserializer, valSerializer, valDeserializer);
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes), serdes, "rocksdb-state", time).disableLogging();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
new file mode 100644
index 0000000..53ec6ba
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+public interface KTableValueGetter<K, V> {
+
+ void init(ProcessorContext context);
+
+ V get(K key);
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
new file mode 100644
index 0000000..1ab6ba6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetterSupplier.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+public interface KTableValueGetterSupplier<K, V> {
+
+ KTableValueGetter<K, V> get();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index 00b56b3..187c4ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -50,7 +50,7 @@ public abstract class PartitionGrouper {
return partitionAssignor.taskIds(partition);
}
- public Set<TaskId> standbyTasks() {
+ public Map<TaskId, Set<TopicPartition>> standbyTasks() {
return partitionAssignor.standbyTasks();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 5344f6c..023bbbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -17,6 +17,9 @@
package org.apache.kafka.streams.processor;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
public class TaskId implements Comparable<TaskId> {
@@ -47,6 +50,15 @@ public class TaskId implements Comparable<TaskId> {
}
}
+ public void writeTo(DataOutputStream out) throws IOException {
+ out.writeInt(topicGroupId);
+ out.writeInt(partition);
+ }
+
+ public static TaskId readFrom(DataInputStream in) throws IOException {
+ return new TaskId(in.readInt(), in.readInt());
+ }
+
public void writeTo(ByteBuffer buf) {
buf.putInt(topicGroupId);
buf.putInt(partition);
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 14037ab..e1b4d62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -28,31 +28,37 @@ import org.apache.kafka.streams.processor.TaskId;
import java.io.File;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public abstract class AbstractTask {
protected final TaskId id;
protected final ProcessorTopology topology;
+ protected final Consumer consumer;
protected final ProcessorStateManager stateMgr;
protected final Set<TopicPartition> partitions;
protected ProcessorContext processorContext;
protected AbstractTask(TaskId id,
- Consumer<byte[], byte[]> restoreConsumer,
+ Collection<TopicPartition> partitions,
ProcessorTopology topology,
+ Consumer<byte[], byte[]> consumer,
+ Consumer<byte[], byte[]> restoreConsumer,
StreamingConfig config,
- Set<TopicPartition> partitions) {
+ boolean isStandby) {
this.id = id;
+ this.partitions = new HashSet<>(partitions);
this.topology = topology;
- this.partitions = partitions;
+ this.consumer = consumer;
// create the processor state manager
try {
File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
// if partitions is null, this is a standby task
- this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, partitions == null);
+ this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, isStandby);
} catch (IOException e) {
throw new KafkaException("Error while creating the state manager", e);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index 451b214..54d5567 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -48,7 +48,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
private StreamThread streamThread;
private int numStandbyReplicas;
private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
- private Set<TaskId> standbyTasks;
+ private Map<TaskId, Set<TopicPartition>> standbyTasks;
@Override
public void configure(Map<String, ?> configs) {
@@ -154,28 +154,32 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
final int numConsumers = consumers.size();
List<TaskId> active = new ArrayList<>();
- Set<TaskId> standby = new HashSet<>();
+ Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
int i = 0;
for (String consumer : consumers) {
- List<TopicPartition> partitions = new ArrayList<>();
+ List<TopicPartition> activePartitions = new ArrayList<>();
final int numTaskIds = taskIds.size();
for (int j = i; j < numTaskIds; j += numConsumers) {
TaskId taskId = taskIds.get(j);
if (j < numActiveTasks) {
for (TopicPartition partition : partitionGroups.get(taskId)) {
- partitions.add(partition);
+ activePartitions.add(partition);
active.add(taskId);
}
} else {
- // no partition to a standby task
- standby.add(taskId);
+ Set<TopicPartition> standbyPartitions = standby.get(taskId);
+ if (standbyPartitions == null) {
+ standbyPartitions = new HashSet<>();
+ standby.put(taskId, standbyPartitions);
+ }
+ standbyPartitions.addAll(partitionGroups.get(taskId));
}
}
AssignmentInfo data = new AssignmentInfo(active, standby);
- assignment.put(consumer, new Assignment(partitions, data.encode()));
+ assignment.put(consumer, new Assignment(activePartitions, data.encode()));
i++;
active.clear();
@@ -220,7 +224,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
return partitionToTaskIds.get(partition);
}
- public Set<TaskId> standbyTasks() {
+ public Map<TaskId, Set<TopicPartition>> standbyTasks() {
return standbyTasks;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 2a8df9e..4cff02d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -33,6 +33,7 @@ import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -52,6 +53,7 @@ public class ProcessorStateManager {
private final Consumer<byte[], byte[]> restoreConsumer;
private final Map<TopicPartition, Long> restoredOffsets;
private final Map<TopicPartition, Long> checkpointedOffsets;
+ private final Map<TopicPartition, Long> offsetLimits;
private final boolean isStandby;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks
@@ -63,6 +65,7 @@ public class ProcessorStateManager {
this.restoredOffsets = new HashMap<>();
this.isStandby = isStandby;
this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
+ this.offsetLimits = new HashMap<>();
// create the state directory for this task if missing (we won't create the parent directory)
createStateDirectory(baseDir);
@@ -165,8 +168,10 @@ public class ProcessorStateManager {
// restore its state from changelog records; while restoring the log end offset
// should not change since it is only written by this thread.
+ long limit = offsetLimit(storePartition);
while (true) {
for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
+ if (record.offset() >= limit) break;
stateRestoreCallback.restore(record.key(), record.value());
}
@@ -178,7 +183,7 @@ public class ProcessorStateManager {
}
// record the restored offset for its change log partition
- long newOffset = restoreConsumer.position(storePartition);
+ long newOffset = Math.min(limit, restoreConsumer.position(storePartition));
restoredOffsets.put(storePartition, newOffset);
} finally {
// un-assign the change log partition
@@ -202,16 +207,40 @@ public class ProcessorStateManager {
return partitionsAndOffsets;
}
- public void updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
+ public List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
+ long limit = offsetLimit(storePartition);
+ List<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
+
// restore states from changelog records
StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());
+ long lastOffset = -1L;
+ int count = 0;
for (ConsumerRecord<byte[], byte[]> record : records) {
- restoreCallback.restore(record.key(), record.value());
+ if (record.offset() < limit) {
+ restoreCallback.restore(record.key(), record.value());
+ lastOffset = record.offset();
+ } else {
+ if (remainingRecords == null)
+ remainingRecords = new ArrayList<>(records.size() - count);
+
+ remainingRecords.add(record);
+ }
+ count++;
}
// record the restored offset for its change log partition
- long newOffset = restoreConsumer.position(storePartition);
- restoredOffsets.put(storePartition, newOffset);
+ restoredOffsets.put(storePartition, lastOffset + 1);
+
+ return remainingRecords;
+ }
+
+ public void putOffsetLimit(TopicPartition partition, long limit) {
+ offsetLimits.put(partition, limit);
+ }
+
+ private long offsetLimit(TopicPartition partition) {
+ Long limit = offsetLimits.get(partition);
+ return limit != null ? limit : Long.MAX_VALUE;
}
public StateStore getStore(String name) {
@@ -253,14 +282,14 @@ public class ProcessorStateManager {
if (stores.get(storeName).persistent()) {
Long offset = ackedOffsets.get(part);
- if (offset == null) {
- // if no record was produced. we need to check the restored offset.
- offset = restoredOffsets.get(part);
- }
-
if (offset != null) {
// store the last offset + 1 (the log position after restoration)
checkpointOffsets.put(part, offset + 1);
+ } else {
+ // if no record was produced. we need to check the restored offset.
+ offset = restoredOffsets.get(part);
+ if (offset != null)
+ checkpointOffsets.put(part, offset);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index c6442d9..d0d8493 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
@@ -50,11 +51,13 @@ public class StandbyTask extends AbstractTask {
* @param metrics the {@link StreamingMetrics} created by the thread
*/
public StandbyTask(TaskId id,
- Consumer<byte[], byte[]> restoreConsumer,
+ Collection<TopicPartition> partitions,
ProcessorTopology topology,
+ Consumer<byte[], byte[]> consumer,
+ Consumer<byte[], byte[]> restoreConsumer,
StreamingConfig config,
StreamingMetrics metrics) {
- super(id, restoreConsumer, topology, config, null);
+ super(id, partitions, topology, consumer, restoreConsumer, config, true);
// initialize the topology with its own context
this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
@@ -64,6 +67,9 @@ public class StandbyTask extends AbstractTask {
((StandbyContextImpl) this.processorContext).initialized();
this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
+
+ // set initial offset limits
+ initializeOffsetLimits();
}
public Map<TopicPartition, Long> checkpointedOffsets() {
@@ -76,13 +82,24 @@ public class StandbyTask extends AbstractTask {
/**
* Updates a state store using records from one change log partition
+ * @return a list of records not consumed
*/
- public void update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
- stateMgr.updateStandbyStates(partition, records);
+ public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
+ return stateMgr.updateStandbyStates(partition, records);
}
public void commit() {
stateMgr.flush();
+
+ // reinitialize offset limits
+ initializeOffsetLimits();
+ }
+
+ protected void initializeOffsetLimits() {
+ for (TopicPartition partition : partitions) {
+ OffsetAndMetadata metadata = consumer.committed(partition); // TODO: batch API?
+ stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() : 0L);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 16f0667..24c450e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -30,9 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -45,7 +43,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
private final int maxBufferedSize;
- private final Consumer consumer;
private final PartitionGroup partitionGroup;
private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
private final PunctuationQueue punctuationQueue;
@@ -73,15 +70,14 @@ public class StreamTask extends AbstractTask implements Punctuator {
* @param metrics the {@link StreamingMetrics} created by the thread
*/
public StreamTask(TaskId id,
+ Collection<TopicPartition> partitions,
+ ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
- Collection<TopicPartition> partitions,
- ProcessorTopology topology,
StreamingConfig config,
StreamingMetrics metrics) {
- super(id, restoreConsumer, topology, config, Collections.unmodifiableSet(new HashSet<>(partitions)));
- this.consumer = consumer;
+ super(id, partitions, topology, consumer, restoreConsumer, config, false);
this.punctuationQueue = new PunctuationQueue();
this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
@@ -98,7 +94,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
- // initialize the consumed and produced offset cache
+ // initialize the consumed offset cache
this.consumedOffsets = new HashMap<>();
// create the record recordCollector that maintains the produced offsets
@@ -245,7 +241,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
if (commitOffsetNeeded) {
Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
- consumedOffsetsAndMetadata.put(entry.getKey(), new OffsetAndMetadata(entry.getValue() + 1L));
+ TopicPartition partition = entry.getKey();
+ long offset = entry.getValue() + 1;
+ consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset));
+ stateMgr.putOffsetLimit(partition, offset);
}
consumer.commitSync(consumedOffsetsAndMetadata);
commitOffsetNeeded = false;
@@ -280,6 +279,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
}
+ @Override
public void close() {
this.partitionGroup.close();
this.consumedOffsets.clear();
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 31dca39..c77a027 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -73,6 +74,7 @@ public class StreamThread extends Thread {
protected final StreamingConfig config;
protected final TopologyBuilder builder;
+ protected final Set<String> sourceTopics;
protected final Producer<byte[], byte[]> producer;
protected final Consumer<byte[], byte[]> consumer;
protected final Consumer<byte[], byte[]> restoreConsumer;
@@ -94,6 +96,9 @@ public class StreamThread extends Thread {
private long lastCommit;
private long recordsProcessed;
+ private final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
+ private boolean processStandbyRecords = false;
+
final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
@@ -133,6 +138,7 @@ public class StreamThread extends Thread {
this.config = config;
this.builder = builder;
+ this.sourceTopics = builder.sourceTopics();
this.clientId = clientId;
this.clientUUID = clientUUID;
this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
@@ -148,6 +154,9 @@ public class StreamThread extends Thread {
this.standbyTasks = new HashMap<>();
this.prevTasks = new HashSet<>();
+ // standby ktables
+ this.standbyRecords = new HashMap<>();
+
// read in task specific config values
this.stateDir = new File(this.config.getString(StreamingConfig.STATE_DIR_CONFIG));
this.stateDir.mkdir();
@@ -256,7 +265,7 @@ public class StreamThread extends Thread {
ensureCopartitioning(builder.copartitionGroups());
- consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener);
+ consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
while (stillRunning()) {
// try to fetch some records if necessary
@@ -293,15 +302,12 @@ public class StreamThread extends Thread {
}
maybePunctuate();
- maybeCommit();
} else {
// even when no task is assigned, we must poll to get a task.
requiresPoll = true;
}
-
- if (!standbyTasks.isEmpty()) {
- updateStandbyTasks();
- }
+ maybeCommit();
+ maybeUpdateStandbyTasks();
maybeClean();
}
@@ -310,13 +316,38 @@ public class StreamThread extends Thread {
}
}
- private void updateStandbyTasks() {
- ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+ private void maybeUpdateStandbyTasks() {
+ if (!standbyTasks.isEmpty()) {
+ if (processStandbyRecords) {
+ if (!standbyRecords.isEmpty()) {
+ for (StandbyTask task : standbyTasks.values()) {
+ for (TopicPartition partition : task.changeLogPartitions()) {
+ List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.remove(partition);
+ if (remaining != null) {
+ remaining = task.update(partition, remaining);
+ if (remaining != null) {
+ standbyRecords.put(partition, remaining);
+ } else {
+ restoreConsumer.resume(partition);
+ }
+ }
+ }
+ }
+ }
+ processStandbyRecords = false;
+ }
- if (!records.isEmpty()) {
- for (StandbyTask task : standbyTasks.values()) {
- for (TopicPartition partition : task.changeLogPartitions()) {
- task.update(partition, records.records(partition));
+ ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+
+ if (!records.isEmpty()) {
+ for (StandbyTask task : standbyTasks.values()) {
+ for (TopicPartition partition : task.changeLogPartitions()) {
+ List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition));
+ if (remaining != null) {
+ restoreConsumer.pause(partition);
+ standbyRecords.put(partition, remaining);
+ }
+ }
}
}
}
@@ -359,6 +390,8 @@ public class StreamThread extends Thread {
commitAll();
lastCommit = now;
+
+ processStandbyRecords = true;
} else {
for (StreamTask task : activeTasks.values()) {
try {
@@ -478,12 +511,12 @@ public class StreamThread extends Thread {
return tasks;
}
- protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
+ protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
sensors.taskCreationSensor.record();
ProcessorTopology topology = builder.build(id.topicGroupId);
- return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, topology, config, sensors);
+ return new StreamTask(id, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
}
private void addStreamTasks(Collection<TopicPartition> assignment) {
@@ -501,7 +534,7 @@ public class StreamThread extends Thread {
}
}
- // create the tasks
+ // create the active tasks
for (TaskId taskId : partitionsForTask.keySet()) {
try {
activeTasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
@@ -510,8 +543,6 @@ public class StreamThread extends Thread {
throw e;
}
}
-
- lastClean = time.milliseconds();
}
private void removeStreamTasks() {
@@ -537,13 +568,13 @@ public class StreamThread extends Thread {
sensors.taskDestructionSensor.record();
}
- protected StandbyTask createStandbyTask(TaskId id) {
+ protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
sensors.taskCreationSensor.record();
ProcessorTopology topology = builder.build(id.topicGroupId);
if (!topology.stateStoreSuppliers().isEmpty()) {
- return new StandbyTask(id, restoreConsumer, topology, config, sensors);
+ return new StandbyTask(id, partitions, topology, consumer, restoreConsumer, config, sensors);
} else {
return null;
}
@@ -552,10 +583,15 @@ public class StreamThread extends Thread {
private void addStandbyTasks() {
Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
- for (TaskId taskId : partitionGrouper.standbyTasks()) {
- StandbyTask task = createStandbyTask(taskId);
+ // create the standby tasks
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionGrouper.standbyTasks().entrySet()) {
+ TaskId taskId = entry.getKey();
+ Set<TopicPartition> partitions = entry.getValue();
+ StandbyTask task = createStandbyTask(taskId, partitions);
if (task != null) {
standbyTasks.put(taskId, task);
+ // collect checked pointed offsets to position the restore consumer
+ // this include all partitions from which we restore states
checkpointedOffsets.putAll(task.checkpointedOffsets());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index d82dd7d..2bd4457 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -17,14 +17,23 @@
package org.apache.kafka.streams.processor.internals.assignment;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.ByteBufferInputStream;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class AssignmentInfo {
@@ -33,70 +42,98 @@ public class AssignmentInfo {
public final int version;
public final List<TaskId> activeTasks; // each element corresponds to a partition
- public final Set<TaskId> standbyTasks;
+ public final Map<TaskId, Set<TopicPartition>> standbyTasks;
- public AssignmentInfo(List<TaskId> activeTasks, Set<TaskId> standbyTasks) {
+ public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
this(1, activeTasks, standbyTasks);
}
- protected AssignmentInfo(int version, List<TaskId> activeTasks, Set<TaskId> standbyTasks) {
+ protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
this.version = version;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
}
public ByteBuffer encode() {
- if (version == 1) {
- ByteBuffer buf = ByteBuffer.allocate(4 + 4 + activeTasks.size() * 8 + 4 + standbyTasks.size() * 8);
- // Encode version
- buf.putInt(1);
- // Encode active tasks
- buf.putInt(activeTasks.size());
- for (TaskId id : activeTasks) {
- id.writeTo(buf);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+
+ try {
+ if (version == 1) {
+ // Encode version
+ out.writeInt(1);
+ // Encode active tasks
+ out.writeInt(activeTasks.size());
+ for (TaskId id : activeTasks) {
+ id.writeTo(out);
+ }
+ // Encode standby tasks
+ out.writeInt(standbyTasks.size());
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
+ TaskId id = entry.getKey();
+ id.writeTo(out);
+
+ Set<TopicPartition> partitions = entry.getValue();
+ out.writeInt(partitions.size());
+ for (TopicPartition partition : partitions) {
+ out.writeUTF(partition.topic());
+ out.writeInt(partition.partition());
+ }
+ }
+
+ out.flush();
+ out.close();
+
+ return ByteBuffer.wrap(baos.toByteArray());
+
+ } else {
+ TaskAssignmentException ex = new TaskAssignmentException("unable to encode assignment data: version=" + version);
+ log.error(ex.getMessage(), ex);
+ throw ex;
}
- // Encode standby tasks
- buf.putInt(standbyTasks.size());
- for (TaskId id : standbyTasks) {
- id.writeTo(buf);
- }
- buf.rewind();
-
- return buf;
-
- } else {
- TaskAssignmentException ex = new TaskAssignmentException("unable to encode assignment data: version=" + version);
- log.error(ex.getMessage(), ex);
- throw ex;
+ } catch (IOException ex) {
+ throw new KafkaException("failed to encode AssignmentInfo", ex);
}
}
public static AssignmentInfo decode(ByteBuffer data) {
// ensure we are at the beginning of the ByteBuffer
data.rewind();
-
- // Decode version
- int version = data.getInt();
- if (version == 1) {
- // Decode active tasks
- int count = data.getInt();
- List<TaskId> activeTasks = new ArrayList<>(count);
- for (int i = 0; i < count; i++) {
- activeTasks.add(TaskId.readFrom(data));
+ DataInputStream in = new DataInputStream(new ByteBufferInputStream(data));
+
+ try {
+ // Decode version
+ int version = in.readInt();
+ if (version == 1) {
+ // Decode active tasks
+ int count = in.readInt();
+ List<TaskId> activeTasks = new ArrayList<>(count);
+ for (int i = 0; i < count; i++) {
+ activeTasks.add(TaskId.readFrom(in));
+ }
+ // Decode standby tasks
+ count = in.readInt();
+ Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
+ for (int i = 0; i < count; i++) {
+ TaskId id = TaskId.readFrom(in);
+
+ int numPartitions = in.readInt();
+ Set<TopicPartition> partitions = new HashSet<>(numPartitions);
+ for (int j = 0; j < numPartitions; j++) {
+ partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
+ }
+ standbyTasks.put(id, partitions);
+ }
+
+ return new AssignmentInfo(activeTasks, standbyTasks);
+
+ } else {
+ TaskAssignmentException ex = new TaskAssignmentException("unknown assignment data version: " + version);
+ log.error(ex.getMessage(), ex);
+ throw ex;
}
- // Decode standby tasks
- count = data.getInt();
- Set<TaskId> standbyTasks = new HashSet<>(count);
- for (int i = 0; i < count; i++) {
- standbyTasks.add(TaskId.readFrom(data));
- }
-
- return new AssignmentInfo(activeTasks, standbyTasks);
-
- } else {
- TaskAssignmentException ex = new TaskAssignmentException("unknown assignment data version: " + version);
- log.error(ex.getMessage(), ex);
- throw ex;
+ } catch (IOException ex) {
+ throw new KafkaException("failed to decode AssignmentInfo", ex);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c3512e/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
new file mode 100644
index 0000000..2ad1f47
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class KeyValueStoreChangeLogger<K, V> {
+
+ protected final Serdes<K, V> serialization;
+
+ private final Set<K> dirty;
+ private final Set<K> removed;
+ private final int maxDirty;
+ private final int maxRemoved;
+
+ private final String topic;
+ private int partition;
+ private ProcessorContext context;
+
+ // always wrap the logged store with the metered store
+ public KeyValueStoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
+ this.topic = topic;
+ this.serialization = serialization;
+ this.context = context;
+ this.partition = context.id().partition;
+
+ this.dirty = new HashSet<>();
+ this.removed = new HashSet<>();
+ this.maxDirty = 100; // TODO: this needs to be configurable
+ this.maxRemoved = 100; // TODO: this needs to be configurable
+ }
+
+ public void add(K key) {
+ this.dirty.add(key);
+ this.removed.remove(key);
+ }
+
+ public void delete(K key) {
+ this.dirty.remove(key);
+ this.removed.add(key);
+ }
+
+ public void maybeLogChange(KeyValueStore<K, V> kv) {
+ if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
+ logChange(kv);
+ }
+
+ public void logChange(KeyValueStore<K, V> kv) {
+ RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+ if (collector != null) {
+ Serializer<K> keySerializer = serialization.keySerializer();
+ Serializer<V> valueSerializer = serialization.valueSerializer();
+
+ for (K k : this.removed) {
+ collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
+ }
+ for (K k : this.dirty) {
+ V v = kv.get(k);
+ collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
+ }
+ this.removed.clear();
+ this.dirty.clear();
+ }
+ }
+
+}