You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/08 21:32:12 UTC
[kafka] branch 1.1 updated: MINOR: Add missing generics and
surpress warning annotations (#4518)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new c8658ab MINOR: Add missing generics and surpress warning annotations (#4518)
c8658ab is described below
commit c8658abec76f9ec0e4715b18867f973d45155839
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Feb 8 13:21:56 2018 -0800
MINOR: Add missing generics and surpress warning annotations (#4518)
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../java/org/apache/kafka/streams/Topology.java | 18 +-
.../streams/kstream/internals/AbstractStream.java | 1 +
.../kstream/internals/KGroupedTableImpl.java | 5 +-
.../streams/state/internals/RocksDBStore.java | 182 ++++++++++-----------
.../RocksDbKeyValueBytesStoreSupplier.java | 5 +-
.../kafka/streams/state/internals/Segment.java | 7 +-
.../org/apache/kafka/streams/TopologyTest.java | 2 +-
.../streams/state/internals/RocksDBStoreTest.java | 128 +++++++++++----
.../apache/kafka/streams/TopologyTestDriver.java | 4 +-
.../kafka/streams/test/ConsumerRecordFactory.java | 2 +-
.../kafka/streams/TopologyTestDriverTest.java | 1 +
11 files changed, 200 insertions(+), 155 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 3b1ac6d..c137a30 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -448,10 +448,10 @@ public class Topology {
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
- public synchronized Topology addSink(final String name,
- final String topic,
- final StreamPartitioner partitioner,
- final String... parentNames) {
+ public synchronized <K, V> Topology addSink(final String name,
+ final String topic,
+ final StreamPartitioner<? super K, ? super V> partitioner,
+ final String... parentNames) {
internalTopologyBuilder.addSink(name, topic, null, null, partitioner, parentNames);
return this;
}
@@ -476,11 +476,11 @@ public class Topology {
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
- public synchronized Topology addSink(final String name,
- final String topic,
- final Serializer keySerializer,
- final Serializer valueSerializer,
- final String... parentNames) {
+ public synchronized <K, V> Topology addSink(final String name,
+ final String topic,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer,
+ final String... parentNames) {
internalTopologyBuilder.addSink(name, topic, keySerializer, valueSerializer, null, parentNames);
return this;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 41cdec2..7410a0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -136,6 +136,7 @@ public abstract class AbstractStream<K> {
public InternalValueTransformerWithKey<K, V, VR> get() {
final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
return new InternalValueTransformerWithKey<K, V, VR>() {
+ @SuppressWarnings("deprecation")
@Override
public VR punctuate(final long timestamp) {
return valueTransformer.punctuate(timestamp);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index d5a4e71..6e33251 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
@@ -49,7 +48,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
protected final Serde<K> keySerde;
protected final Serde<V> valSerde;
- private boolean isQueryable = true;
+ private boolean isQueryable;
private final Initializer<Long> countInitializer = new Initializer<Long>() {
@Override
public Long apply() {
@@ -142,7 +141,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
@SuppressWarnings("deprecation")
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
final String functionName,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME);
final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME);
final String funcName = builder.newProcessorName(functionName);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 67ec915..a2e45e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
@@ -28,11 +27,9 @@ import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallba
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
-import org.apache.kafka.streams.state.StateSerdes;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
@@ -66,12 +63,9 @@ import java.util.Set;
* If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
* i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
*
- * @param <K> The key type
- * @param <V> The value type
- *
* @see org.apache.kafka.streams.state.Stores#create(String)
*/
-public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
+public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
private static final int TTL_NOT_USED = -1;
@@ -89,10 +83,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<KeyValueIterator>());
File dbDir;
- private StateSerdes<K, V> serdes;
- private final Serde<K> keySerde;
- private final Serde<V> valueSerde;
-
private RocksDB db;
// the following option objects will be created in the constructor and closed in the close() method
@@ -107,19 +97,17 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
protected volatile boolean open = false;
- RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
- this(name, DB_FILE_DIR, keySerde, valueSerde);
+ RocksDBStore(String name) {
+ this(name, DB_FILE_DIR);
}
- RocksDBStore(String name, String parentDir, Serde<K> keySerde, Serde<V> valueSerde) {
+ RocksDBStore(String name, String parentDir) {
this.name = name;
this.parentDir = parentDir;
- this.keySerde = keySerde;
- this.valueSerde = valueSerde;
}
@SuppressWarnings("unchecked")
- public void openDB(ProcessorContext context) {
+ public void openDB(final ProcessorContext context) {
// initialize the default rocksdb options
final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
@@ -161,13 +149,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass);
configSetter.setConfig(name, options, configs);
}
- // we need to construct the serde while opening DB since
- // it is also triggered by windowed DB segments without initialization
- this.serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-
this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
try {
@@ -179,7 +160,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
open = true;
}
- public void init(ProcessorContext context, StateStore root) {
+ public void init(final ProcessorContext context,
+ final StateStore root) {
// open the DB dir
this.internalProcessorContext = context;
openDB(context);
@@ -190,7 +172,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
context.register(root, false, this.batchingStateRestoreCallback);
}
- private RocksDB openDB(File dir, Options options, int ttl) throws IOException {
+ private RocksDB openDB(final File dir,
+ final Options options,
+ final int ttl) throws IOException {
try {
if (ttl == TTL_NOT_USED) {
Files.createDirectories(dir.getParentFile().toPath());
@@ -200,7 +184,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
// TODO: support TTL with change log?
// return TtlDB.open(options, dir.toString(), ttl, false);
}
- } catch (RocksDBException e) {
+ } catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + this.name + " at location " + dir.toString(), e);
}
}
@@ -226,10 +210,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
@Override
- public synchronized V get(K key) {
+ public synchronized byte[] get(final Bytes key) {
validateStoreOpen();
- byte[] byteValue = getInternal(serdes.rawKey(key));
- return byteValue == null ? null : serdes.valueFrom(byteValue);
+ return getInternal(key.get());
}
private void validateStoreOpen() {
@@ -238,23 +221,22 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
}
- private byte[] getInternal(byte[] rawKey) {
+ private byte[] getInternal(final byte[] rawKey) {
try {
return this.db.get(rawKey);
- } catch (RocksDBException e) {
- throw new ProcessorStateException("Error while getting value for key " + serdes.keyFrom(rawKey) +
- " from store " + this.name, e);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error while getting value for key from store " + this.name, e);
}
}
- private void toggleDbForBulkLoading(boolean prepareForBulkload) {
+ private void toggleDbForBulkLoading(final boolean prepareForBulkload) {
if (prepareForBulkload) {
// if the store is not empty, we need to compact to get around the num.levels check
// for bulk loading
final String[] sstFileNames = dbDir.list(new FilenameFilter() {
@Override
- public boolean accept(File dir, String name) {
+ public boolean accept(final File dir, final String name) {
return name.matches(".*\\.sst");
}
});
@@ -262,7 +244,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
if (sstFileNames != null && sstFileNames.length > 0) {
try {
this.db.compactRange(true, 1, 0);
- } catch (RocksDBException e) {
+ } catch (final RocksDBException e) {
throw new ProcessorStateException("Error while range compacting during restoring store " + this.name, e);
}
@@ -280,27 +262,27 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@SuppressWarnings("unchecked")
@Override
- public synchronized void put(K key, V value) {
+ public synchronized void put(final Bytes key,
+ final byte[] value) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
- byte[] rawKey = serdes.rawKey(key);
- byte[] rawValue = serdes.rawValue(value);
- putInternal(rawKey, rawValue);
+ putInternal(key.get(), value);
}
@Override
- public synchronized V putIfAbsent(K key, V value) {
+ public synchronized byte[] putIfAbsent(final Bytes key,
+ final byte[] value) {
Objects.requireNonNull(key, "key cannot be null");
- V originalValue = get(key);
+ final byte[] originalValue = get(key);
if (originalValue == null) {
put(key, value);
}
return originalValue;
}
- private void restoreAllInternal(Collection<KeyValue<byte[], byte[]>> records) {
- try (WriteBatch batch = new WriteBatch()) {
- for (KeyValue<byte[], byte[]> record : records) {
+ private void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> records) {
+ try (final WriteBatch batch = new WriteBatch()) {
+ for (final KeyValue<byte[], byte[]> record : records) {
if (record.value == null) {
batch.remove(record.key);
} else {
@@ -308,98 +290,96 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
}
db.write(wOptions, batch);
- } catch (RocksDBException e) {
+ } catch (final RocksDBException e) {
throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
}
}
- private void putInternal(byte[] rawKey, byte[] rawValue) {
+ private void putInternal(final byte[] rawKey,
+ final byte[] rawValue) {
if (rawValue == null) {
try {
db.delete(wOptions, rawKey);
- } catch (RocksDBException e) {
- throw new ProcessorStateException("Error while removing key " + serdes.keyFrom(rawKey) +
- " from store " + this.name, e);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error while removing key from store " + this.name, e);
}
} else {
try {
db.put(wOptions, rawKey, rawValue);
- } catch (RocksDBException e) {
- throw new ProcessorStateException("Error while executing put key " + serdes.keyFrom(rawKey) +
- " and value " + serdes.keyFrom(rawValue) + " from store " + this.name, e);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error while executing putting key/value into store " + this.name, e);
}
}
}
@Override
- public void putAll(List<KeyValue<K, V>> entries) {
- try (WriteBatch batch = new WriteBatch()) {
- for (KeyValue<K, V> entry : entries) {
+ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+ try (final WriteBatch batch = new WriteBatch()) {
+ for (final KeyValue<Bytes, byte[]> entry : entries) {
Objects.requireNonNull(entry.key, "key cannot be null");
- final byte[] rawKey = serdes.rawKey(entry.key);
- final byte[] rawValue = serdes.rawValue(entry.value);
- if (rawValue == null) {
- batch.remove(rawKey);
+ if (entry.value == null) {
+ batch.remove(entry.key.get());
} else {
- batch.put(rawKey, rawValue);
+ batch.put(entry.key.get(), entry.value);
}
}
db.write(wOptions, batch);
- } catch (RocksDBException e) {
+ } catch (final RocksDBException e) {
throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
}
}
@Override
- public synchronized V delete(K key) {
+ public synchronized byte[] delete(final Bytes key) {
Objects.requireNonNull(key, "key cannot be null");
- V value = get(key);
+ final byte[] value = get(key);
put(key, null);
return value;
}
@Override
- public synchronized KeyValueIterator<K, V> range(K from, K to) {
+ public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ final Bytes to) {
Objects.requireNonNull(from, "from cannot be null");
Objects.requireNonNull(to, "to cannot be null");
validateStoreOpen();
// query rocksdb
- final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name, db.newIterator(), serdes, from, to);
+ final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name, db.newIterator(), from, to);
openIterators.add(rocksDBRangeIterator);
return rocksDBRangeIterator;
}
@Override
- public synchronized KeyValueIterator<K, V> all() {
+ public synchronized KeyValueIterator<Bytes, byte[]> all() {
validateStoreOpen();
// query rocksdb
- RocksIterator innerIter = db.newIterator();
+ final RocksIterator innerIter = db.newIterator();
innerIter.seekToFirst();
- final RocksDbIterator rocksDbIterator = new RocksDbIterator(name, innerIter, serdes);
+ final RocksDbIterator rocksDbIterator = new RocksDbIterator(name, innerIter);
openIterators.add(rocksDbIterator);
return rocksDbIterator;
}
- public synchronized KeyValue<K, V> first() {
+ public synchronized KeyValue<Bytes, byte[]> first() {
validateStoreOpen();
-
- RocksIterator innerIter = db.newIterator();
+
+ final RocksIterator innerIter = db.newIterator();
innerIter.seekToFirst();
- KeyValue<K, V> pair = new KeyValue<>(serdes.keyFrom(innerIter.key()), serdes.valueFrom(innerIter.value()));
+ final KeyValue<Bytes, byte[]> pair = new KeyValue<>(new Bytes(innerIter.key()), innerIter.value());
innerIter.close();
return pair;
}
- public synchronized KeyValue<K, V> last() {
+ public synchronized KeyValue<Bytes, byte[]> last() {
validateStoreOpen();
-
- RocksIterator innerIter = db.newIterator();
+
+ final RocksIterator innerIter = db.newIterator();
innerIter.seekToLast();
- KeyValue<K, V> pair = new KeyValue<>(serdes.keyFrom(innerIter.key()), serdes.valueFrom(innerIter.value()));
+ final KeyValue<Bytes, byte[]> pair = new KeyValue<>(new Bytes(innerIter.key()), innerIter.value());
innerIter.close();
return pair;
@@ -419,10 +399,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public long approximateNumEntries() {
validateStoreOpen();
- long value;
+ final long value;
try {
value = this.db.getLongProperty("rocksdb.estimate-num-keys");
- } catch (RocksDBException e) {
+ } catch (final RocksDBException e) {
throw new ProcessorStateException("Error fetching property from store " + this.name, e);
}
if (isOverflowing(value)) {
@@ -431,7 +411,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
return value;
}
- private boolean isOverflowing(long value) {
+ private boolean isOverflowing(final long value) {
// RocksDB returns an unsigned 8-byte integer, which could overflow long
// and manifest as a negative value.
return value < 0;
@@ -451,7 +431,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private void flushInternal() {
try {
db.flush(fOptions);
- } catch (RocksDBException e) {
+ } catch (final RocksDBException e) {
throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
}
}
@@ -476,25 +456,24 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
private void closeOpenIterators() {
- HashSet<KeyValueIterator> iterators;
+ final HashSet<KeyValueIterator> iterators;
synchronized (openIterators) {
iterators = new HashSet<>(openIterators);
}
- for (KeyValueIterator iterator : iterators) {
+ for (final KeyValueIterator iterator : iterators) {
iterator.close();
}
}
- private class RocksDbIterator implements KeyValueIterator<K, V> {
+ private class RocksDbIterator implements KeyValueIterator<Bytes, byte[]> {
private final String storeName;
private final RocksIterator iter;
- private final StateSerdes<K, V> serdes;
private volatile boolean open = true;
- RocksDbIterator(String storeName, RocksIterator iter, StateSerdes<K, V> serdes) {
+ RocksDbIterator(final String storeName,
+ final RocksIterator iter) {
this.iter = iter;
- this.serdes = serdes;
this.storeName = storeName;
}
@@ -502,8 +481,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
return iter.key();
}
- private KeyValue<K, V> getKeyValue() {
- return new KeyValue<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+ private KeyValue<Bytes, byte[]> getKeyValue() {
+ return new KeyValue<>(new Bytes(iter.key()), iter.value());
}
@Override
@@ -519,11 +498,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
* @throws NoSuchElementException if no next element exist
*/
@Override
- public synchronized KeyValue<K, V> next() {
+ public synchronized KeyValue<Bytes, byte[]> next() {
if (!hasNext())
throw new NoSuchElementException();
- KeyValue<K, V> entry = this.getKeyValue();
+ final KeyValue<Bytes, byte[]> entry = this.getKeyValue();
iter.next();
return entry;
}
@@ -541,11 +520,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
@Override
- public K peekNextKey() {
+ public Bytes peekNextKey() {
if (!hasNext()) {
throw new NoSuchElementException();
}
- return serdes.keyFrom(iter.key());
+ return new Bytes(iter.key());
}
}
@@ -554,12 +533,15 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
// comparator to be pluggable, and the default is lexicographic, so it's
// safe to just force lexicographic comparator here for now.
private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
- private byte[] rawToKey;
-
- RocksDBRangeIterator(String storeName, RocksIterator iter, StateSerdes<K, V> serdes, K from, K to) {
- super(storeName, iter, serdes);
- iter.seek(serdes.rawKey(from));
- this.rawToKey = serdes.rawKey(to);
+ private final byte[] rawToKey;
+
+ RocksDBRangeIterator(final String storeName,
+ final RocksIterator iter,
+ final Bytes from,
+ final Bytes to) {
+ super(storeName, iter);
+ iter.seek(from.get());
+ this.rawToKey = to.get();
if (this.rawToKey == null) {
throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
index 7870579..b0ad619 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -36,9 +35,7 @@ public class RocksDbKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupp
@Override
public KeyValueStore<Bytes, byte[]> get() {
- return new RocksDBStore<>(name,
- Serdes.Bytes(),
- Serdes.ByteArray());
+ return new RocksDBStore(name);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index 50c1547..7b2b803 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -16,18 +16,16 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.ProcessorContext;
import java.io.IOException;
-// Use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
-class Segment extends RocksDBStore<Bytes, byte[]> implements Comparable<Segment> {
+class Segment extends RocksDBStore implements Comparable<Segment> {
public final long id;
Segment(String segmentName, String windowName, long id) {
- super(segmentName, windowName, WindowStoreUtils.INNER_KEY_SERDE, WindowStoreUtils.INNER_VALUE_SERDE);
+ super(segmentName, windowName);
this.id = id;
}
@@ -43,7 +41,6 @@ class Segment extends RocksDBStore<Bytes, byte[]> implements Comparable<Segment>
@Override
public void openDB(final ProcessorContext context) {
super.openDB(context);
-
// skip the registering step
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 0a45803..992ffd8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -157,7 +157,7 @@ public class TopologyTest {
}
@Test
- public void shoudNotAllowToAddProcessorWithSameName() {
+ public void shouldNotAllowToAddProcessorWithSameName() {
topology.addSource("source", "topic-1");
topology.addProcessor("processor", new MockProcessorSupplier(), "source");
try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 1f15a03..49e893b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -17,7 +17,12 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
@@ -54,13 +59,15 @@ import static org.junit.Assert.fail;
public class RocksDBStoreTest {
private final File tempDir = TestUtils.tempDirectory();
- private RocksDBStore<String, String> subject;
+ private Serializer<String> stringSerializer = new StringSerializer();
+ private Deserializer<String> stringDeserializer = new StringDeserializer();
+ private RocksDBStore subject;
private MockProcessorContext context;
private File dir;
@Before
public void setUp() {
- subject = new RocksDBStore<>("test", Serdes.String(), Serdes.String());
+ subject = new RocksDBStore("test");
dir = TestUtils.tempDirectory();
context = new MockProcessorContext(dir,
Serdes.String(),
@@ -81,7 +88,8 @@ public class RocksDBStoreTest {
final String message = "how can a 4 ounce bird carry a 2lb coconut";
int intKey = 1;
for (int i = 0; i < 2000000; i++) {
- subject.put("theKeyIs" + intKey++, message);
+ subject.put(new Bytes(stringSerializer.serialize(null, "theKeyIs" + intKey++)),
+ stringSerializer.serialize(null, message));
}
final List<KeyValue<byte[], byte[]>> restoreBytes = new ArrayList<>();
@@ -92,7 +100,11 @@ public class RocksDBStoreTest {
context.restore("test", restoreBytes);
- assertThat(subject.get("restoredKey"), equalTo("restoredValue"));
+ assertThat(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "restoredKey")))),
+ equalTo("restoredValue"));
}
@Test
@@ -122,18 +134,36 @@ public class RocksDBStoreTest {
@Test
public void shouldPutAll() {
- List<KeyValue<String, String>> entries = new ArrayList<>();
- entries.add(new KeyValue<>("1", "a"));
- entries.add(new KeyValue<>("2", "b"));
- entries.add(new KeyValue<>("3", "c"));
+ List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+ entries.add(new KeyValue<>(
+ new Bytes(stringSerializer.serialize(null, "1")),
+ stringSerializer.serialize(null, "a")));
+ entries.add(new KeyValue<>(
+ new Bytes(stringSerializer.serialize(null, "2")),
+ stringSerializer.serialize(null, "b")));
+ entries.add(new KeyValue<>(
+ new Bytes(stringSerializer.serialize(null, "3")),
+ stringSerializer.serialize(null, "c")));
subject.init(context, subject);
subject.putAll(entries);
subject.flush();
- assertEquals(subject.get("1"), "a");
- assertEquals(subject.get("2"), "b");
- assertEquals(subject.get("3"), "c");
+ assertEquals(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "1")))),
+ "a");
+ assertEquals(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "2")))),
+ "b");
+ assertEquals(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "3")))),
+ "c");
}
@Test
@@ -173,9 +203,21 @@ public class RocksDBStoreTest {
subject.init(context, subject);
context.restore(subject.name(), entries);
- assertEquals(subject.get("1"), "a");
- assertEquals(subject.get("2"), "b");
- assertEquals(subject.get("3"), "c");
+ assertEquals(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "1")))),
+ "a");
+ assertEquals(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "2")))),
+ "b");
+ assertEquals(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "3")))),
+ "c");
}
@@ -187,11 +229,11 @@ public class RocksDBStoreTest {
subject.init(context, subject);
context.restore(subject.name(), entries);
- final KeyValueIterator<String, String> iterator = subject.all();
+ final KeyValueIterator<Bytes, byte[]> iterator = subject.all();
final Set<String> keys = new HashSet<>();
while (iterator.hasNext()) {
- keys.add(iterator.next().key);
+ keys.add(stringDeserializer.deserialize(null, iterator.next().key.get()));
}
assertThat(keys, equalTo(Utils.mkSet("2", "3")));
@@ -211,18 +253,30 @@ public class RocksDBStoreTest {
subject.init(context, subject);
context.restore(subject.name(), entries);
- final KeyValueIterator<String, String> iterator = subject.all();
+ final KeyValueIterator<Bytes, byte[]> iterator = subject.all();
final Set<String> keys = new HashSet<>();
while (iterator.hasNext()) {
- keys.add(iterator.next().key);
+ keys.add(stringDeserializer.deserialize(null, iterator.next().key.get()));
}
assertThat(keys, equalTo(Utils.mkSet("1", "2", "3")));
- assertEquals(subject.get("1"), "restored");
- assertEquals(subject.get("2"), "b");
- assertEquals(subject.get("3"), "c");
+ assertEquals(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "1")))),
+ "restored");
+ assertEquals(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "2")))),
+ "b");
+ assertEquals(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "3")))),
+ "c");
}
@Test
@@ -233,9 +287,21 @@ public class RocksDBStoreTest {
context.restore(subject.name(), entries);
- assertEquals(subject.get("1"), "a");
- assertEquals(subject.get("2"), "b");
- assertEquals(subject.get("3"), "c");
+ assertEquals(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "1")))),
+ "a");
+ assertEquals(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "2")))),
+ "b");
+ assertEquals(
+ stringDeserializer.deserialize(
+ null,
+ subject.get(new Bytes(stringSerializer.serialize(null, "3")))),
+ "c");
entries.clear();
@@ -245,11 +311,11 @@ public class RocksDBStoreTest {
context.restore(subject.name(), entries);
- final KeyValueIterator<String, String> iterator = subject.all();
+ final KeyValueIterator<Bytes, byte[]> iterator = subject.all();
final Set<String> keys = new HashSet<>();
while (iterator.hasNext()) {
- keys.add(iterator.next().key);
+ keys.add(stringDeserializer.deserialize(null, iterator.next().key.get()));
}
assertThat(keys, equalTo(Utils.mkSet("2", "3")));
@@ -261,7 +327,7 @@ public class RocksDBStoreTest {
public void shouldThrowNullPointerExceptionOnNullPut() {
subject.init(context, subject);
try {
- subject.put(null, "someVal");
+ subject.put(null, stringSerializer.serialize(null, "someVal"));
fail("Should have thrown NullPointerException on null put()");
} catch (NullPointerException e) { }
}
@@ -270,7 +336,7 @@ public class RocksDBStoreTest {
public void shouldThrowNullPointerExceptionOnNullPutAll() {
subject.init(context, subject);
try {
- subject.put(null, "someVal");
+ subject.put(null, stringSerializer.serialize(null, "someVal"));
fail("Should have thrown NullPointerException on null put()");
} catch (NullPointerException e) { }
}
@@ -297,7 +363,7 @@ public class RocksDBStoreTest {
public void shouldThrowNullPointerExceptionOnRange() {
subject.init(context, subject);
try {
- subject.range(null, "2");
+ subject.range(null, new Bytes(stringSerializer.serialize(null, "2")));
fail("Should have thrown NullPointerException on deleting null key");
} catch (NullPointerException e) { }
}
@@ -306,7 +372,9 @@ public class RocksDBStoreTest {
public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException {
subject.init(context, subject);
Utils.delete(dir);
- subject.put("anyKey", "anyValue");
+ subject.put(
+ new Bytes(stringSerializer.serialize(null, "anyKey")),
+ stringSerializer.serialize(null, "anyValue"));
subject.flush();
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 6168640..ff63554 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -323,7 +323,7 @@ public class TopologyTestDriver {
offset,
consumerRecord.timestamp(),
consumerRecord.timestampType(),
- consumerRecord.checksum(),
+ ConsumerRecord.NULL_CHECKSUM,
consumerRecord.serializedKeySize(),
consumerRecord.serializedValueSize(),
consumerRecord.key(),
@@ -376,7 +376,7 @@ public class TopologyTestDriver {
offset,
consumerRecord.timestamp(),
consumerRecord.timestampType(),
- consumerRecord.checksum(),
+ ConsumerRecord.NULL_CHECKSUM,
consumerRecord.serializedKeySize(),
consumerRecord.serializedValueSize(),
consumerRecord.key(),
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
index ea8d632..b0ccd61 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -175,7 +175,7 @@ public class ConsumerRecordFactory<K, V> {
-1L,
timestampMs,
TimestampType.CREATE_TIME,
- 0L,
+ ConsumerRecord.NULL_CHECKSUM,
serializedKey == null ? 0 : serializedKey.length,
serializedValue == null ? 0 : serializedValue.length,
serializedKey,
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 5073efd..921f6d6 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -190,6 +190,7 @@ public class TopologyTestDriverTest {
context.forward(key, value);
}
+ @SuppressWarnings("deprecation")
@Override
public void punctuate(long timestamp) {} // deprecated
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.