You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/28 06:28:53 UTC
kafka git commit: KAFKA-4986 Follow-up: cleanup unit tests and
further comments addressed
Repository: kafka
Updated Branches:
refs/heads/trunk bc10f5f17 -> a931e9954
KAFKA-4986 Follow-up: cleanup unit tests and further comments addressed
- addressing open Github comments from #2773
- test clean-up
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy, Guozhang Wang
Closes #2854 from mjsax/kafka-4986-producer-per-task-follow-up
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a931e995
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a931e995
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a931e995
Branch: refs/heads/trunk
Commit: a931e9954d27f4c9f12fd89afd6f0fe523cf35e8
Parents: bc10f5f
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Apr 27 23:28:50 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 27 23:28:50 2017 -0700
----------------------------------------------------------------------
.../kafka/clients/producer/MockProducer.java | 38 +++++++++--
.../org/apache/kafka/streams/StreamsConfig.java | 16 ++++-
.../streams/processor/internals/StreamTask.java | 9 ++-
.../processor/internals/StreamThread.java | 2 +-
.../processor/internals/StreamTaskTest.java | 4 +-
.../processor/internals/StreamThreadTest.java | 69 ++++++--------------
.../apache/kafka/test/MockClientSupplier.java | 17 ++---
7 files changed, 83 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 80ea372..a4f59ac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -55,6 +55,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
private Map<TopicPartition, Long> offsets;
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
+ private boolean closed;
/**
* Create a mock producer
@@ -68,7 +69,11 @@ public class MockProducer<K, V> implements Producer<K, V> {
* @param keySerializer The serializer for key that implements {@link Serializer}.
* @param valueSerializer The serializer for value that implements {@link Serializer}.
*/
- public MockProducer(Cluster cluster, boolean autoComplete, Partitioner partitioner, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+ public MockProducer(final Cluster cluster,
+ final boolean autoComplete,
+ final Partitioner partitioner,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer) {
this.cluster = cluster;
this.autoComplete = autoComplete;
this.partitioner = partitioner;
@@ -80,23 +85,37 @@ public class MockProducer<K, V> implements Producer<K, V> {
}
/**
- * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers
+ * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers.
*
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
*/
- public MockProducer(boolean autoComplete, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+ public MockProducer(final boolean autoComplete,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer) {
this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer);
}
/**
- * Create a new mock producer with invented metadata the given autoComplete setting, partitioner and key\value serializers
+ * Create a new mock producer with invented metadata the given autoComplete setting, partitioner and key\value serializers.
*
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, partitioner, keySerializer, valueSerializer)}
*/
- public MockProducer(boolean autoComplete, Partitioner partitioner, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+ public MockProducer(final boolean autoComplete,
+ final Partitioner partitioner,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer) {
this(Cluster.empty(), autoComplete, partitioner, keySerializer, valueSerializer);
}
+ /**
+ * Create a new mock producer with invented metadata.
+ *
+ * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), false, null, null, null)}
+ */
+ public MockProducer() {
+ this(Cluster.empty(), false, null, null, null);
+ }
+
public void initTransactions() {
}
@@ -183,10 +202,19 @@ public class MockProducer<K, V> implements Producer<K, V> {
@Override
public void close() {
+ close(0, null);
}
@Override
public void close(long timeout, TimeUnit timeUnit) {
+ if (closed) {
+ throw new IllegalStateException("MockedProducer is already closed.");
+ }
+ closed = true;
+ }
+
+ public boolean closed() {
+ return closed;
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index a04d7f3..35e6e3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -94,6 +94,16 @@ public class StreamsConfig extends AbstractConfig {
*/
public static final String PRODUCER_PREFIX = "producer.";
+ /**
+ * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
+ */
+ public static final String AT_LEAST_ONCE = "at_least_once";
+
+ /**
+ * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees.
+ */
+ public static final String EXACTLY_ONCE = "exactly_once";
+
/** {@code application.id} */
public static final String APPLICATION_ID_CONFIG = "application.id";
private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
@@ -162,7 +172,7 @@ public class StreamsConfig extends AbstractConfig {
/** {@code cache.max.bytes.buffering} */
public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee";
- private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. Possible values are <code>at_least_once</code> (default) and <code>exactly_once</code>.";
+ private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. Possible values are <code>" + AT_LEAST_ONCE + "</code> (default) and <code>" + EXACTLY_ONCE + "</code>.";
/** {@code receive.buffer.bytes} */
public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
@@ -397,8 +407,8 @@ public class StreamsConfig extends AbstractConfig {
REQUEST_TIMEOUT_MS_DOC)
.define(PROCESSING_GUARANTEE_CONFIG,
ConfigDef.Type.STRING,
- "at_least_once",
- in("at_least_once", "exactly_once"),
+ AT_LEAST_ONCE,
+ in(AT_LEAST_ONCE, EXACTLY_ONCE),
Importance.MEDIUM,
PROCESSING_GUARANTEE_DOC);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8b60b2f..d18efef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
@@ -109,7 +108,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache);
punctuationQueue = new PunctuationQueue();
maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
- exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals("exactly_once");
+ exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
this.metrics = new TaskMetrics(metrics);
// create queues for each assigned partition and associate them
@@ -451,9 +450,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
return processorContext;
}
- // visible for testing only
- Producer<byte[], byte[]> producer() {
- return ((RecordCollectorImpl) recordCollector).producer();
+ // for testing only
+ RecordCollector recordCollector() {
+ return recordCollector;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 989aba8..7918196 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -451,7 +451,7 @@ public class StreamThread extends Thread {
log.warn("{} Negative cache size passed in thread. Reverting to cache size of 0 bytes.", logPrefix);
}
cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
- exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals("exactly_once");
+ exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
// set the consumer clients
http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 50cc364..e9c4ef9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -555,14 +555,14 @@ public class StreamTaskTest {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
final StreamsConfig config = new StreamsConfig(properties);
- final MockedProducer producer = new MockedProducer(null);
+ final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
changelogReader, config, streamsMetrics, stateDirectory, null, time, new RecordCollectorImpl(producer, "taskId"));
task.close();
- assertTrue(producer.closed);
+ assertTrue(producer.closed());
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 70433b4..5b44260 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -55,7 +55,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -211,7 +210,7 @@ public class StreamThreadTest {
final ProcessorTopology topology = builder.build(id.topicGroupId);
return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer,
- mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
+ mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
}
};
@@ -496,7 +495,7 @@ public class StreamThreadTest {
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
final ProcessorTopology topology = builder.build(id.topicGroupId);
return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer,
- mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
+ mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
}
};
@@ -626,7 +625,7 @@ public class StreamThreadTest {
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
final ProcessorTopology topology = builder.build(id.topicGroupId);
return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer,
- mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
+ mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
}
};
@@ -706,10 +705,11 @@ public class StreamThreadTest {
thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));
- assertEquals(1, clientSupplier.numberOfCreatedProducers);
- assertSame(clientSupplier.producer, thread.threadProducer);
+ assertEquals(1, clientSupplier.producers.size());
+ final Producer globalProducer = clientSupplier.producers.get(0);
+ assertSame(globalProducer, thread.threadProducer);
for (final StreamTask task : thread.tasks().values()) {
- assertSame(clientSupplier.producer, task.producer());
+ assertSame(globalProducer, ((RecordCollectorImpl) task.recordCollector()).producer());
}
assertSame(clientSupplier.consumer, thread.consumer);
assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
@@ -719,9 +719,9 @@ public class StreamThreadTest {
public void shouldInjectProducerPerTaskUsingClientSupplierForEoS() {
final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
final Properties properties = configProps();
- properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
+ properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
final StreamsConfig config = new StreamsConfig(properties);
- final EoSMockClientSupplier clientSupplier = new EoSMockClientSupplier();
+ final MockClientSupplier clientSupplier = new MockClientSupplier();
final StreamThread thread = new StreamThread(
builder,
config,
@@ -745,34 +745,22 @@ public class StreamThreadTest {
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertNull(thread.threadProducer);
- assertEquals(thread.tasks().size(), clientSupplier.numberOfCreatedProducers);
+ assertEquals(thread.tasks().size(), clientSupplier.producers.size());
final Iterator it = clientSupplier.producers.iterator();
for (final StreamTask task : thread.tasks().values()) {
- assertSame(it.next(), task.producer());
+ assertSame(it.next(), ((RecordCollectorImpl) task.recordCollector()).producer());
}
assertSame(clientSupplier.consumer, thread.consumer);
assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
}
- private static class EoSMockClientSupplier extends MockClientSupplier {
- final List<Producer> producers = new LinkedList<>();
-
- @Override
- public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
- final Producer<byte[], byte[]> producer = new MockedProducer<>();
- producers.add(producer);
- ++numberOfCreatedProducers;
- return producer;
- }
- }
-
@Test
public void shouldCloseAllTaskProducers() {
final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
final Properties properties = configProps();
- properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
+ properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
final StreamsConfig config = new StreamsConfig(properties);
- final EoSMockClientSupplier clientSupplier = new EoSMockClientSupplier();
+ final MockClientSupplier clientSupplier = new MockClientSupplier();
final StreamThread thread = new StreamThread(
builder,
config,
@@ -796,7 +784,7 @@ public class StreamThreadTest {
thread.run();
for (final StreamTask task : thread.tasks().values()) {
- assertTrue(((MockedProducer) task.producer()).closed);
+ assertTrue(((MockProducer) ((RecordCollectorImpl) task.recordCollector()).producer()).closed());
}
}
@@ -804,7 +792,7 @@ public class StreamThreadTest {
public void shouldCloseThreadProducer() {
final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
final StreamsConfig config = new StreamsConfig(configProps());
- final EoSMockClientSupplier clientSupplier = new EoSMockClientSupplier();
+ final MockClientSupplier clientSupplier = new MockClientSupplier();
final StreamThread thread = new StreamThread(
builder,
config,
@@ -827,7 +815,7 @@ public class StreamThreadTest {
thread.close();
thread.run();
- assertTrue(((MockedProducer) thread.threadProducer).closed);
+ assertTrue(((MockProducer) thread.threadProducer).closed());
}
@Test
@@ -985,7 +973,7 @@ public class StreamThreadTest {
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
final ProcessorTopology topology = builder.build(id.topicGroupId);
final TestStreamTask task = new TestStreamTask(id, applicationId, partitions, topology, consumer,
- mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
+ mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
createdTasks.put(partitions, task);
return task;
}
@@ -1036,7 +1024,7 @@ public class StreamThreadTest {
Utils.mkSet(new TopicPartition("t1", 0)),
builder.build(0),
clientSupplier.consumer,
- clientSupplier.producer,
+ clientSupplier.getProducer(new HashMap()),
clientSupplier.restoreConsumer,
config,
new MockStreamsMetrics(new Metrics()),
@@ -1088,7 +1076,7 @@ public class StreamThreadTest {
Utils.mkSet(new TopicPartition("t1", 0)),
builder.build(0),
clientSupplier.consumer,
- clientSupplier.producer,
+ clientSupplier.getProducer(new HashMap()),
clientSupplier.restoreConsumer,
config,
new MockStreamsMetrics(new Metrics()),
@@ -1140,7 +1128,7 @@ public class StreamThreadTest {
Utils.mkSet(new TopicPartition("t1", 0)),
builder.build(0),
clientSupplier.consumer,
- clientSupplier.producer,
+ clientSupplier.getProducer(new HashMap()),
clientSupplier.restoreConsumer,
config,
new MockStreamsMetrics(new Metrics()),
@@ -1191,7 +1179,7 @@ public class StreamThreadTest {
Utils.mkSet(new TopicPartition("t1", 0)),
builder.build(0),
clientSupplier.consumer,
- clientSupplier.producer,
+ clientSupplier.getProducer(new HashMap()),
clientSupplier.restoreConsumer,
config,
new MockStreamsMetrics(new Metrics()),
@@ -1263,19 +1251,4 @@ public class StreamThreadTest {
}
}
- private final static class MockedProducer<K, V> extends MockProducer<K, V> {
- boolean closed = false;
-
- MockedProducer() {
- super(false, null, null);
- }
-
- @Override
- public void close() {
- if (closed) {
- throw new IllegalStateException("MockedProducer is already closed.");
- }
- closed = true;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index 4afd442..531fdb6 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -24,31 +24,32 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.streams.KafkaClientSupplier;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
public class MockClientSupplier implements KafkaClientSupplier {
private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
- public int numberOfCreatedProducers = 0;
-
- public final MockProducer<byte[], byte[]> producer =
- new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
public final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
public final MockConsumer<byte[], byte[]> restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+ public final List<Producer> producers = new LinkedList<>();
+
@Override
- public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
- ++numberOfCreatedProducers;
+ public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
+ final Producer<byte[], byte[]> producer = new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
+ producers.add(producer);
return producer;
}
@Override
- public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
+ public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
return consumer;
}
@Override
- public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
+ public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) {
return restoreConsumer;
}