You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/21 17:47:31 UTC
kafka git commit: MINOR: fix record collector to stick with streams
partitioner behavior if it is specified
Repository: kafka
Updated Branches:
refs/heads/trunk 93fbda4c5 -> 3bedcce01
MINOR: fix record collector to stick with streams partitioner behavior if it is specified
If `partition==null` and `partitioner!=null` we should not fall back to default partitioner (as we do before the patch if `producer.partitionsFor(...)` returns empty list. Falling back to default partitioner might corrupt hash partitioning.
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Eno Thereska, Damian Guy, Guozhang Wang
Closes #2868 from mjsax/minor-fix-RecordCollector
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3bedcce0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3bedcce0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3bedcce0
Branch: refs/heads/trunk
Commit: 3bedcce01b9d3e8ba0516fa33eb59d57817ce27e
Parents: 93fbda4
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Apr 21 10:47:28 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Apr 21 10:47:28 2017 -0700
----------------------------------------------------------------------
.../processor/internals/RecordCollector.java | 26 ++---
.../internals/RecordCollectorImpl.java | 72 +++++++------
.../streams/processor/internals/SinkNode.java | 34 ++++---
.../processor/internals/StandbyContextImpl.java | 74 +++++++-------
.../internals/RecordCollectorTest.java | 68 ++++++++-----
.../streams/state/KeyValueStoreTestDriver.java | 101 +++++++++----------
.../state/internals/StoreChangeLoggerTest.java | 29 +++---
.../apache/kafka/test/KStreamTestDriver.java | 94 ++++++++---------
.../apache/kafka/test/NoOpRecordCollector.java | 29 +++---
9 files changed, 277 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 1fa5b97..4516f8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -23,22 +23,22 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import java.util.Map;
public interface RecordCollector {
+
<K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer);
+ final K key,
+ final V value,
+ final Integer partition,
+ final Long timestamp,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer);
<K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer,
- StreamPartitioner<? super K, ? super V> partitioner);
+ final K key,
+ final V value,
+ final Long timestamp,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer,
+ final StreamPartitioner<? super K, ? super V> partitioner);
void flush();
http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 0122ea0..9d2ac03 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -46,54 +46,62 @@ public class RecordCollectorImpl implements RecordCollector {
private volatile Exception sendException;
- public RecordCollectorImpl(Producer<byte[], byte[]> producer, String streamTaskId) {
+ public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId) {
this.producer = producer;
- this.offsets = new HashMap<>();
- this.logPrefix = String.format("task [%s]", streamTaskId);
+ offsets = new HashMap<>();
+ logPrefix = String.format("task [%s]", streamTaskId);
}
@Override
public <K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer) {
- send(topic, key, value, partition, timestamp, keySerializer, valueSerializer, null);
+ final K key,
+ final V value,
+ final Long timestamp,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer,
+ final StreamPartitioner<? super K, ? super V> partitioner) {
+ Integer partition = null;
+
+ if (partitioner != null) {
+ final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+ if (partitions.size() > 0) {
+ partition = partitioner.partition(key, value, partitions.size());
+ } else {
+ throw new StreamsException("Could not get partition information for topic '" + topic + "'." +
+ " This can happen if the topic does not exist.");
+ }
+ }
+
+ send(topic, key, value, partition, timestamp, keySerializer, valueSerializer);
}
@Override
public <K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer,
- StreamPartitioner<? super K, ? super V> partitioner) {
+ final K key,
+ final V value,
+ final Integer partition,
+ final Long timestamp,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer) {
checkForException();
- byte[] keyBytes = keySerializer.serialize(topic, key);
- byte[] valBytes = valueSerializer.serialize(topic, value);
- if (partition == null && partitioner != null) {
- List<PartitionInfo> partitions = this.producer.partitionsFor(topic);
- if (partitions != null && partitions.size() > 0)
- partition = partitioner.partition(key, value, partitions.size());
- }
+ final byte[] keyBytes = keySerializer.serialize(topic, key);
+ final byte[] valBytes = valueSerializer.serialize(topic, value);
- ProducerRecord<byte[], byte[]> serializedRecord =
+ final ProducerRecord<byte[], byte[]> serializedRecord =
new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes);
- for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; attempt++) {
+ // counting from 1 to make check further down more natural
+ // -> `if (attempt == MAX_SEND_ATTEMPTS)`
+ for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; ++attempt) {
try {
- this.producer.send(serializedRecord, new Callback() {
+ producer.send(serializedRecord, new Callback() {
@Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
+ public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception == null) {
if (sendException != null) {
return;
}
- TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+ final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
offsets.put(tp, metadata.offset());
} else {
if (sendException == null) {
@@ -104,7 +112,7 @@ public class RecordCollectorImpl implements RecordCollector {
}
});
return;
- } catch (TimeoutException e) {
+ } catch (final TimeoutException e) {
if (attempt == MAX_SEND_ATTEMPTS) {
throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", logPrefix, topic, attempt));
}
@@ -124,7 +132,7 @@ public class RecordCollectorImpl implements RecordCollector {
@Override
public void flush() {
log.debug("{} Flushing producer", logPrefix);
- this.producer.flush();
+ producer.flush();
checkForException();
}
@@ -144,7 +152,7 @@ public class RecordCollectorImpl implements RecordCollector {
*/
@Override
public Map<TopicPartition, Long> offsets() {
- return this.offsets;
+ return offsets;
}
// for testing only
http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 3d4f282..00e5dca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -31,7 +31,11 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
private ProcessorContext context;
- public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
+ public SinkNode(final String name,
+ final String topic,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valSerializer,
+ final StreamPartitioner<? super K, ? super V> partitioner) {
super(name);
this.topic = topic;
@@ -44,30 +48,35 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
* @throws UnsupportedOperationException if this method adds a child to a sink node
*/
@Override
- public void addChild(ProcessorNode<?, ?> child) {
+ public void addChild(final ProcessorNode<?, ?> child) {
throw new UnsupportedOperationException("sink node does not allow addChild");
}
@SuppressWarnings("unchecked")
@Override
- public void init(ProcessorContext context) {
+ public void init(final ProcessorContext context) {
super.init(context);
this.context = context;
// if serializers are null, get the default ones from the context
- if (this.keySerializer == null) this.keySerializer = (Serializer<K>) context.keySerde().serializer();
- if (this.valSerializer == null) this.valSerializer = (Serializer<V>) context.valueSerde().serializer();
+ if (keySerializer == null) {
+ keySerializer = (Serializer<K>) context.keySerde().serializer();
+ }
+ if (valSerializer == null) {
+ valSerializer = (Serializer<V>) context.valueSerde().serializer();
+ }
// if value serializers are for {@code Change} values, set the inner serializer when necessary
- if (this.valSerializer instanceof ChangedSerializer &&
- ((ChangedSerializer) this.valSerializer).inner() == null)
- ((ChangedSerializer) this.valSerializer).setInner(context.valueSerde().serializer());
+ if (valSerializer instanceof ChangedSerializer &&
+ ((ChangedSerializer) valSerializer).inner() == null) {
+ ((ChangedSerializer) valSerializer).setInner(context.valueSerde().serializer());
+ }
}
@Override
public void process(final K key, final V value) {
- RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+ final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
final long timestamp = context.timestamp();
if (timestamp < 0) {
@@ -75,8 +84,8 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
}
try {
- collector.send(topic, key, value, null, timestamp, keySerializer, valSerializer, partitioner);
- } catch (ClassCastException e) {
+ collector.send(topic, key, value, timestamp, keySerializer, valSerializer, partitioner);
+ } catch (final ClassCastException e) {
final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
throw new StreamsException(
@@ -102,7 +111,8 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
/**
* @return a string representation of this node starting with the given indent, useful for debugging.
*/
- public String toString(String indent) {
+ @Override
+ public String toString(final String indent) {
final StringBuilder sb = new StringBuilder(super.toString(indent));
sb.append(indent).append("\ttopic:\t\t");
sb.append(topic);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 00dbb3c..d738a19 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -32,25 +32,22 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
private static final RecordCollector NO_OP_COLLECTOR = new RecordCollector() {
@Override
public <K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer) {
+ final K key,
+ final V value,
+ final Integer partition,
+ final Long timestamp,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer) {
}
@Override
public <K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer,
- StreamPartitioner<? super K, ? super V> partitioner) {
-
- }
+ final K key,
+ final V value,
+ final Long timestamp,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer,
+ final StreamPartitioner<? super K, ? super V> partitioner) {}
@Override
public void flush() {
@@ -69,10 +66,10 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
};
public StandbyContextImpl(final TaskId id,
- final String applicationId,
- final StreamsConfig config,
- final ProcessorStateManager stateMgr,
- final StreamsMetrics metrics) {
+ final String applicationId,
+ final StreamsConfig config,
+ final ProcessorStateManager stateMgr,
+ final StreamsMetrics metrics) {
super(id, applicationId, config, metrics, stateMgr, new ThreadCache("zeroCache", 0, metrics));
}
@@ -87,15 +84,15 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
}
/**
- * @throws UnsupportedOperationException
+ * @throws UnsupportedOperationException on every invocation
*/
@Override
- public StateStore getStateStore(String name) {
+ public StateStore getStateStore(final String name) {
throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks.");
}
/**
- * @throws UnsupportedOperationException
+ * @throws UnsupportedOperationException on every invocation
*/
@Override
public String topic() {
@@ -103,7 +100,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
}
/**
- * @throws UnsupportedOperationException
+ * @throws UnsupportedOperationException on every invocation
*/
@Override
public int partition() {
@@ -111,7 +108,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
}
/**
- * @throws UnsupportedOperationException
+ * @throws UnsupportedOperationException on every invocation
*/
@Override
public long offset() {
@@ -119,7 +116,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
}
/**
- * @throws UnsupportedOperationException
+ * @throws UnsupportedOperationException on every invocation
*/
@Override
public long timestamp() {
@@ -127,31 +124,31 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
}
/**
- * @throws UnsupportedOperationException
+ * @throws UnsupportedOperationException on every invocation
*/
@Override
- public <K, V> void forward(K key, V value) {
+ public <K, V> void forward(final K key, final V value) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
/**
- * @throws UnsupportedOperationException
+ * @throws UnsupportedOperationException on every invocation
*/
@Override
- public <K, V> void forward(K key, V value, int childIndex) {
+ public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
/**
- * @throws UnsupportedOperationException
+ * @throws UnsupportedOperationException on every invocation
*/
@Override
- public <K, V> void forward(K key, V value, String childName) {
+ public <K, V> void forward(final K key, final V value, final String childName) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
/**
- * @throws UnsupportedOperationException
+ * @throws UnsupportedOperationException on every invocation
*/
@Override
public void commit() {
@@ -159,19 +156,25 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
}
/**
- * @throws UnsupportedOperationException
+ * @throws UnsupportedOperationException on every invocation
*/
@Override
- public void schedule(long interval) {
+ public void schedule(final long interval) {
throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
}
+ /**
+ * @throws UnsupportedOperationException on every invocation
+ */
@Override
public RecordContext recordContext() {
throw new UnsupportedOperationException("this should not happen: recordContext not supported in standby tasks.");
}
+ /**
+ * @throws UnsupportedOperationException on every invocation
+ */
@Override
public void setRecordContext(final RecordContext recordContext) {
throw new UnsupportedOperationException("this should not happen: setRecordContext not supported in standby tasks.");
@@ -183,6 +186,9 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
// no-op. can't throw as this is called on commit when the StateStores get flushed.
}
+ /**
+ * @throws UnsupportedOperationException on every invocation
+ */
@Override
public ProcessorNode currentNode() {
throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");
http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 5798514..1aa7f38 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -43,13 +43,13 @@ import static org.junit.Assert.assertEquals;
public class RecordCollectorTest {
- private List<PartitionInfo> infos = Arrays.asList(
+ private final List<PartitionInfo> infos = Arrays.asList(
new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0])
);
- private Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos,
+ private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos,
Collections.<String>emptySet(), Collections.<String>emptySet());
@@ -58,7 +58,7 @@ public class RecordCollectorTest {
private final StreamPartitioner<String, Object> streamPartitioner = new StreamPartitioner<String, Object>() {
@Override
- public Integer partition(String key, Object value, int numPartitions) {
+ public Integer partition(final String key, final Object value, final int numPartitions) {
return Integer.parseInt(key) % numPartitions;
}
};
@@ -66,7 +66,7 @@ public class RecordCollectorTest {
@Test
public void testSpecificPartition() {
- RecordCollectorImpl collector = new RecordCollectorImpl(
+ final RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
"RecordCollectorTest-TestSpecificPartition");
@@ -79,16 +79,16 @@ public class RecordCollectorTest {
collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);
- Map<TopicPartition, Long> offsets = collector.offsets();
+ final Map<TopicPartition, Long> offsets = collector.offsets();
assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 0)));
assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 1)));
assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
// ignore StreamPartitioner
- collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
+ collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
+ collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);
assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0)));
assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
@@ -98,23 +98,23 @@ public class RecordCollectorTest {
@Test
public void testStreamPartitioner() {
- RecordCollectorImpl collector = new RecordCollectorImpl(
+ final RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
"RecordCollectorTest-TestStreamPartitioner");
- collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "9", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "27", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "81", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "243", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "27", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "81", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "243", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "28", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "82", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "244", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "28", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "82", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "244", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "245", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "245", "0", null, stringSerializer, stringSerializer, streamPartitioner);
- Map<TopicPartition, Long> offsets = collector.offsets();
+ final Map<TopicPartition, Long> offsets = collector.offsets();
assertEquals((Long) 4L, offsets.get(new TopicPartition("topic1", 0)));
assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
@@ -125,7 +125,7 @@ public class RecordCollectorTest {
@Test
public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
final AtomicInteger attempt = new AtomicInteger(0);
- RecordCollectorImpl collector = new RecordCollectorImpl(
+ final RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
@@ -137,7 +137,7 @@ public class RecordCollectorTest {
},
"test");
- collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
assertEquals(Long.valueOf(0L), offset);
}
@@ -145,7 +145,7 @@ public class RecordCollectorTest {
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
- RecordCollector collector = new RecordCollectorImpl(
+ final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
@@ -154,7 +154,7 @@ public class RecordCollectorTest {
},
"test");
- collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
@@ -170,8 +170,8 @@ public class RecordCollectorTest {
}
},
"test");
- collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
- collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
@SuppressWarnings("unchecked")
@@ -186,7 +186,7 @@ public class RecordCollectorTest {
}
},
"test");
- collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.flush();
}
@@ -202,8 +202,22 @@ public class RecordCollectorTest {
}
},
"test");
- collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+ collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.close();
}
+ @SuppressWarnings("unchecked")
+ @Test(expected = StreamsException.class)
+ public void shouldThrowIfTopicIsUnknown() {
+ final RecordCollector collector = new RecordCollectorImpl(
+ new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ @Override
+ public List<PartitionInfo> partitionsFor(final String topic) {
+ return Collections.EMPTY_LIST;
+ }
+
+ },
+ "test");
+ collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index b758799..a6804e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -45,7 +45,7 @@ import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import java.io.File;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -154,9 +154,9 @@ public class KeyValueStoreTestDriver<K, V> {
* {@code Long.class}, or {@code byte[].class}
* @return the test driver; never null
*/
- public static <K, V> KeyValueStoreTestDriver<K, V> create(Class<K> keyClass, Class<V> valueClass) {
- StateSerdes<K, V> serdes = StateSerdes.withBuiltinTypes("unexpected", keyClass, valueClass);
- return new KeyValueStoreTestDriver<K, V>(serdes);
+ public static <K, V> KeyValueStoreTestDriver<K, V> create(final Class<K> keyClass, final Class<V> valueClass) {
+ final StateSerdes<K, V> serdes = StateSerdes.withBuiltinTypes("unexpected", keyClass, valueClass);
+ return new KeyValueStoreTestDriver<>(serdes);
}
/**
@@ -175,11 +175,11 @@ public class KeyValueStoreTestDriver<K, V> {
final Deserializer<K> keyDeserializer,
final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) {
- StateSerdes<K, V> serdes = new StateSerdes<K, V>(
+ final StateSerdes<K, V> serdes = new StateSerdes<>(
"unexpected",
Serdes.serdeFrom(keySerializer, keyDeserializer),
Serdes.serdeFrom(valueSerializer, valueDeserializer));
- return new KeyValueStoreTestDriver<K, V>(serdes);
+ return new KeyValueStoreTestDriver<>(serdes);
}
private final Map<K, V> flushedEntries = new HashMap<>();
@@ -187,53 +187,50 @@ public class KeyValueStoreTestDriver<K, V> {
private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>();
private final MockProcessorContext context;
private final Map<String, StateStore> storeMap = new HashMap<>();
- private MockTime time = new MockTime();
- private MetricConfig config = new MetricConfig();
- private Metrics metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true);
+ private final MockTime time = new MockTime();
+ private final MetricConfig config = new MetricConfig();
+ private final Metrics metrics = new Metrics(config, Collections.singletonList((MetricsReporter) new JmxReporter()), time, true);
private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
private final ThreadCache cache = new ThreadCache("testCache", DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(metrics);
- private final RecordCollector recordCollector;
private File stateDir = null;
- protected KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
- ByteArraySerializer rawSerializer = new ByteArraySerializer();
- Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
+ private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
+ final ByteArraySerializer rawSerializer = new ByteArraySerializer();
+ final Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
- this.recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") {
+ final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") {
@SuppressWarnings("unchecked")
@Override
public <K1, V1> void send(final String topic,
- K1 key,
- V1 value,
- Integer partition,
- Long timestamp,
- Serializer<K1> keySerializer,
- Serializer<V1> valueSerializer) {
+ final K1 key,
+ final V1 value,
+ final Integer partition,
+ final Long timestamp,
+ final Serializer<K1> keySerializer,
+ final Serializer<V1> valueSerializer) {
// for byte arrays we need to wrap it for comparison
- K keyTest = serdes.keyFrom(keySerializer.serialize(topic, key));
- V valueTest = serdes.valueFrom(valueSerializer.serialize(topic, value));
+ final K keyTest = serdes.keyFrom(keySerializer.serialize(topic, key));
+ final V valueTest = serdes.valueFrom(valueSerializer.serialize(topic, value));
recordFlushed(keyTest, valueTest);
}
@Override
public <K1, V1> void send(final String topic,
- K1 key,
- V1 value,
- Integer partition,
- Long timestamp,
- Serializer<K1> keySerializer,
- Serializer<V1> valueSerializer,
- StreamPartitioner<? super K1, ? super V1> partitioner) {
- // ignore partitioner
- send(topic, key, value, partition, timestamp, keySerializer, valueSerializer);
+ final K1 key,
+ final V1 value,
+ final Long timestamp,
+ final Serializer<K1> keySerializer,
+ final Serializer<V1> valueSerializer,
+ final StreamPartitioner<? super K1, ? super V1> partitioner) {
+ throw new UnsupportedOperationException();
}
};
- this.stateDir = TestUtils.tempDirectory();
- this.stateDir.mkdirs();
+ stateDir = TestUtils.tempDirectory();
+ stateDir.mkdirs();
props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
@@ -244,25 +241,25 @@ public class KeyValueStoreTestDriver<K, V> {
- this.context = new MockProcessorContext(this.stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
+ context = new MockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
@Override
public TaskId taskId() {
return new TaskId(0, 1);
}
@Override
- public <K1, V1> void forward(K1 key, V1 value, int childIndex) {
+ public <K1, V1> void forward(final K1 key, final V1 value, final int childIndex) {
forward(key, value);
}
@Override
- public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) {
+ public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback func) {
storeMap.put(store.name(), store);
restoreEntries(func, serdes);
}
@Override
- public StateStore getStateStore(String name) {
+ public StateStore getStateStore(final String name) {
return storeMap.get(name);
}
@@ -282,7 +279,7 @@ public class KeyValueStoreTestDriver<K, V> {
}
@Override
- public Map<String, Object> appConfigsWithPrefix(String prefix) {
+ public Map<String, Object> appConfigsWithPrefix(final String prefix) {
return new StreamsConfig(props).originalsWithPrefix(prefix);
}
@@ -298,7 +295,7 @@ public class KeyValueStoreTestDriver<K, V> {
};
}
- protected void recordFlushed(K key, V value) {
+ private void recordFlushed(final K key, final V value) {
if (value == null) {
// This is a removal ...
flushedRemovals.add(key);
@@ -310,11 +307,11 @@ public class KeyValueStoreTestDriver<K, V> {
}
}
- private void restoreEntries(StateRestoreCallback func, StateSerdes<K, V> serdes) {
- for (KeyValue<K, V> entry : restorableEntries) {
+ private void restoreEntries(final StateRestoreCallback func, final StateSerdes<K, V> serdes) {
+ for (final KeyValue<K, V> entry : restorableEntries) {
if (entry != null) {
- byte[] rawKey = serdes.rawKey(entry.key);
- byte[] rawValue = serdes.rawValue(entry.value);
+ final byte[] rawKey = serdes.rawKey(entry.key);
+ final byte[] rawValue = serdes.rawValue(entry.value);
func.restore(rawKey, rawValue);
}
}
@@ -349,8 +346,8 @@ public class KeyValueStoreTestDriver<K, V> {
* @param value the value for the entry
* @see #checkForRestoredEntries(KeyValueStore)
*/
- public void addEntryToRestoreLog(K key, V value) {
- restorableEntries.add(new KeyValue<K, V>(key, value));
+ public void addEntryToRestoreLog(final K key, final V value) {
+ restorableEntries.add(new KeyValue<>(key, value));
}
/**
@@ -386,11 +383,11 @@ public class KeyValueStoreTestDriver<K, V> {
* @return the number of restore entries missing from the store, or 0 if all restore entries were found
* @see #addEntryToRestoreLog(Object, Object)
*/
- public int checkForRestoredEntries(KeyValueStore<K, V> store) {
+ public int checkForRestoredEntries(final KeyValueStore<K, V> store) {
int missing = 0;
- for (KeyValue<K, V> kv : restorableEntries) {
+ for (final KeyValue<K, V> kv : restorableEntries) {
if (kv != null) {
- V value = store.get(kv.key);
+ final V value = store.get(kv.key);
if (!Objects.equals(value, kv.value)) {
++missing;
}
@@ -405,7 +402,7 @@ public class KeyValueStoreTestDriver<K, V> {
* @param store the key value store using this {@link #context()}.
* @return the number of entries
*/
- public int sizeOf(KeyValueStore<K, V> store) {
+ public int sizeOf(final KeyValueStore<K, V> store) {
int size = 0;
try (KeyValueIterator<K, V> iterator = store.all()) {
while (iterator.hasNext()) {
@@ -421,9 +418,9 @@ public class KeyValueStoreTestDriver<K, V> {
*
* @param key the key
* @return the value that was flushed with the key, or {@code null} if no such key was flushed or if the entry with this
- * key was {@link #flushedEntryStored(Object) removed} upon flush
+ * key was removed upon flush
*/
- public V flushedEntryStored(K key) {
+ public V flushedEntryStored(final K key) {
return flushedEntries.get(key);
}
@@ -434,7 +431,7 @@ public class KeyValueStoreTestDriver<K, V> {
* @return {@code true} if the entry with the given key was removed when flushed, or {@code false} if the entry was not
* removed when last flushed
*/
- public boolean flushedEntryRemoved(K key) {
+ public boolean flushedEntryRemoved(final K key) {
return flushedRemovals.contains(key);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 311eaf6..515fef6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -35,33 +35,30 @@ public class StoreChangeLoggerTest {
private final String topic = "topic";
private final Map<Integer, String> logged = new HashMap<>();
- private final Map<Integer, String> written = new HashMap<>();
private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
new RecordCollectorImpl(null, "StoreChangeLoggerTest") {
@SuppressWarnings("unchecked")
@Override
public <K1, V1> void send(final String topic,
- K1 key,
- V1 value,
- Integer partition,
- Long timestamp,
- Serializer<K1> keySerializer,
- Serializer<V1> valueSerializer) {
+ final K1 key,
+ final V1 value,
+ final Integer partition,
+ final Long timestamp,
+ final Serializer<K1> keySerializer,
+ final Serializer<V1> valueSerializer) {
logged.put((Integer) key, (String) value);
}
@Override
public <K1, V1> void send(final String topic,
- K1 key,
- V1 value,
- Integer partition,
- Long timestamp,
- Serializer<K1> keySerializer,
- Serializer<V1> valueSerializer,
- StreamPartitioner<? super K1, ? super V1> partitioner) {
- // ignore partitioner
- send(topic, key, value, partition, timestamp, keySerializer, valueSerializer);
+ final K1 key,
+ final V1 value,
+ final Long timestamp,
+ final Serializer<K1> keySerializer,
+ final Serializer<V1> valueSerializer,
+ final StreamPartitioner<? super K1, ? super V1> partitioner) {
+ throw new UnsupportedOperationException();
}
}
);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index f79da31..5113285 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -46,36 +46,36 @@ public class KStreamTestDriver {
private final MockProcessorContext context;
private final ProcessorTopology globalTopology;
- public KStreamTestDriver(KStreamBuilder builder) {
+ public KStreamTestDriver(final KStreamBuilder builder) {
this(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
}
- public KStreamTestDriver(KStreamBuilder builder, File stateDir) {
+ public KStreamTestDriver(final KStreamBuilder builder, final File stateDir) {
this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
}
- public KStreamTestDriver(KStreamBuilder builder, File stateDir, final long cacheSize) {
+ public KStreamTestDriver(final KStreamBuilder builder, final File stateDir, final long cacheSize) {
this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
}
- public KStreamTestDriver(KStreamBuilder builder,
- File stateDir,
- Serde<?> keySerde,
- Serde<?> valSerde) {
+ public KStreamTestDriver(final KStreamBuilder builder,
+ final File stateDir,
+ final Serde<?> keySerde,
+ final Serde<?> valSerde) {
this(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
}
- public KStreamTestDriver(KStreamBuilder builder,
- File stateDir,
- Serde<?> keySerde,
- Serde<?> valSerde,
- long cacheSize) {
+ public KStreamTestDriver(final KStreamBuilder builder,
+ final File stateDir,
+ final Serde<?> keySerde,
+ final Serde<?> valSerde,
+ final long cacheSize) {
builder.setApplicationId("TestDriver");
- this.topology = builder.build(null);
- this.globalTopology = builder.buildGlobalStateTopology();
- ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new Metrics()));
- this.context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
- this.context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
+ topology = builder.build(null);
+ globalTopology = builder.buildGlobalStateTopology();
+ final ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new Metrics()));
+ context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
// init global topology first as it will add stores to the
// store map that are required for joins etc.
if (globalTopology != null) {
@@ -85,11 +85,11 @@ public class KStreamTestDriver {
}
private void initTopology(final ProcessorTopology topology, final List<StateStore> stores) {
- for (StateStore store : stores) {
+ for (final StateStore store : stores) {
store.init(context, store);
}
- for (ProcessorNode node : topology.processors()) {
+ for (final ProcessorNode node : topology.processors()) {
context.setCurrentNode(node);
try {
node.init(context);
@@ -107,7 +107,7 @@ public class KStreamTestDriver {
return context;
}
- public void process(String topicName, Object key, Object value) {
+ public void process(final String topicName, final Object key, final Object value) {
final ProcessorNode prevNode = context.currentNode();
ProcessorNode currNode = topology.source(topicName);
if (currNode == null && globalTopology != null) {
@@ -128,9 +128,9 @@ public class KStreamTestDriver {
}
}
- public void punctuate(long timestamp) {
+ public void punctuate(final long timestamp) {
final ProcessorNode prevNode = context.currentNode();
- for (ProcessorNode processor : topology.processors()) {
+ for (final ProcessorNode processor : topology.processors()) {
if (processor.processor() != null) {
context.setRecordContext(createRecordContext(timestamp));
context.setCurrentNode(processor);
@@ -143,13 +143,13 @@ public class KStreamTestDriver {
}
}
- public void setTime(long timestamp) {
+ public void setTime(final long timestamp) {
context.setTime(timestamp);
}
public void close() {
// close all processors
- for (ProcessorNode node : topology.processors()) {
+ for (final ProcessorNode node : topology.processors()) {
context.setCurrentNode(node);
try {
node.close();
@@ -162,23 +162,24 @@ public class KStreamTestDriver {
}
public Set<String> allProcessorNames() {
- Set<String> names = new HashSet<>();
+ final Set<String> names = new HashSet<>();
- List<ProcessorNode> nodes = topology.processors();
+ final List<ProcessorNode> nodes = topology.processors();
- for (ProcessorNode node: nodes) {
+ for (final ProcessorNode node: nodes) {
names.add(node.name());
}
return names;
}
- public ProcessorNode processor(String name) {
- List<ProcessorNode> nodes = topology.processors();
+ public ProcessorNode processor(final String name) {
+ final List<ProcessorNode> nodes = topology.processors();
- for (ProcessorNode node: nodes) {
- if (node.name().equals(name))
+ for (final ProcessorNode node: nodes) {
+ if (node.name().equals(name)) {
return node;
+ }
}
return null;
@@ -189,7 +190,7 @@ public class KStreamTestDriver {
}
public void flushState() {
- for (StateStore stateStore : context.allStateStores().values()) {
+ for (final StateStore stateStore : context.allStateStores().values()) {
stateStore.flush();
}
}
@@ -199,12 +200,12 @@ public class KStreamTestDriver {
// of them since the flushing could cause eviction and hence tries to access other stores
flushState();
- for (StateStore stateStore : context.allStateStores().values()) {
+ for (final StateStore stateStore : context.allStateStores().values()) {
stateStore.close();
}
}
- private ProcessorRecordContext createRecordContext(long timestamp) {
+ private ProcessorRecordContext createRecordContext(final long timestamp) {
return new ProcessorRecordContext(timestamp, -1, -1, "topic");
}
@@ -215,25 +216,24 @@ public class KStreamTestDriver {
@Override
public <K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer,
- StreamPartitioner<? super K, ? super V> partitioner) {
+ final K key,
+ final V value,
+ final Long timestamp,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer,
+ final StreamPartitioner<? super K, ? super V> partitioner) {
// The serialization is skipped.
process(topic, key, value);
}
@Override
public <K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer) {
+ final K key,
+ final V value,
+ final Integer partition,
+ final Long timestamp,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer) {
// The serialization is skipped.
process(topic, key, value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
index 3349ae1..05175f9 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -28,26 +28,21 @@ public class NoOpRecordCollector implements RecordCollector {
@Override
public <K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer) {
- // no-op
- }
+ final K key,
+ final V value,
+ final Integer partition,
+ final Long timestamp,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer) {}
@Override
public <K, V> void send(final String topic,
- K key,
- V value,
- Integer partition,
- Long timestamp,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer,
- StreamPartitioner<? super K, ? super V> partitioner) {
- // no-op
- }
+ final K key,
+ final V value,
+ final Long timestamp,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer,
+ final StreamPartitioner<? super K, ? super V> partitioner) {}
@Override
public void flush() {