You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/10/17 15:17:41 UTC
[06/13] beam git commit: Improve how we use MockProducer in tests.
Remove global instance and use a map to use same producer instance inside
beam and test verifier.
Improve how we use MockProducer in tests.
Remove global instance and use a map to use same producer
instance inside beam and test verifier.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bd4085d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bd4085d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bd4085d
Branch: refs/heads/master
Commit: 3bd4085db154241c477f8cc318162530029456fb
Parents: 236484b
Author: Raghu Angadi <ra...@google.com>
Authored: Tue Aug 22 14:37:59 2017 -0700
Committer: Raghu Angadi <ra...@google.com>
Committed: Tue Oct 17 00:02:05 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +-
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 208 ++++++++++++-------
2 files changed, 139 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3bd4085d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 78227a0..1d9560b 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1811,7 +1811,7 @@ public class KafkaIO {
// and id, it implies that record and the associated id are checkpointed to persistent
// storage and this record will always have same id, even in retries.
// Exactly-once semantics are achieved by writing records in the strict order of
- // these checkpointed sequence ids.
+ // these check-pointed sequence ids.
//
// Parallelism for B and C is fixed to 'numShards', which defaults to number of partitions
// for the topic. A few reasons for that:
http://git-wip-us.apache.org/repos/asf/beam/blob/3bd4085d/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index eaf30d6..88c9d21 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -34,13 +34,17 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -180,7 +184,7 @@ public class KafkaIOTest {
//1. SpEL can find this function, either input is List or Collection;
//2. List extends Collection, so super.assign() could find either assign(List)
// or assign(Collection).
- public void assign(final List<TopicPartition> assigned) {
+ public void assign(final Collection<TopicPartition> assigned) {
super.assign(assigned);
assignedPartitions.set(ImmutableList.copyOf(assigned));
for (TopicPartition tp : assigned) {
@@ -724,11 +728,10 @@ public class KafkaIOTest {
int numElements = 1000;
- synchronized (MOCK_PRODUCER_LOCK) {
+ try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
- MOCK_PRODUCER.clear();
-
- ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
+ ProducerSendCompletionThread completionThread =
+ new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
String topic = "test";
@@ -740,32 +743,26 @@ public class KafkaIOTest {
.withTopic(topic)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(LongSerializer.class)
- .withEOS()
- .withSinkGroupId("test")
- .withNumShards(1)
- .withConsumerFactoryFn(new ConsumerFactoryFn(
- Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST))
- .withProducerFactoryFn(new ProducerFactoryFn()));
+ .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));
p.run();
completionThread.shutdown();
- verifyProducerRecords(topic, numElements, false);
+ verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false);
}
}
- //@Test (TODO: reenable. global MOCK_PRODUCER does not work with 0.11 anymore for multiple tests)
+ @Test
public void testValuesSink() throws Exception {
// similar to testSink(), but use values()' interface.
int numElements = 1000;
- synchronized (MOCK_PRODUCER_LOCK) {
-
- MOCK_PRODUCER.clear();
+ try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
- ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
+ ProducerSendCompletionThread completionThread =
+ new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
String topic = "test";
@@ -777,18 +774,56 @@ public class KafkaIOTest {
.withBootstrapServers("none")
.withTopic(topic)
.withValueSerializer(LongSerializer.class)
- .withProducerFactoryFn(new ProducerFactoryFn())
+ .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))
.values());
p.run();
completionThread.shutdown();
- verifyProducerRecords(topic, numElements, true);
+ verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, true);
+ }
+ }
+
+ @Test
+ public void testEOSink() throws Exception {
+ // similar to testSink(), enables to EOS.
+ // This does not actually test exactly-once-semantics. Mainly exercises the code.
+
+ int numElements = 1000;
+
+ try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
+
+ ProducerSendCompletionThread completionThread =
+ new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
+
+ String topic = "test";
+
+ p
+ .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+ .withoutMetadata())
+ .apply(KafkaIO.<Integer, Long>write()
+ .withBootstrapServers("none")
+ .withTopic(topic)
+ .withKeySerializer(IntegerSerializer.class)
+ .withValueSerializer(LongSerializer.class)
+ .withEOS()
+ .withSinkGroupId("test")
+ .withNumShards(1)
+ .withConsumerFactoryFn(new ConsumerFactoryFn(
+ Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST))
+ .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));
+
+ p.run();
+
+ completionThread.shutdown();
+
+ verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false);
}
}
- //@Test (TODO: reenable. global MOCK_PRODUCER does not work with 0.11 anymore for multiple tests)
+
+ @Test
public void testSinkWithSendErrors() throws Throwable {
// similar to testSink(), except that up to 10 of the send calls to producer will fail
// asynchronously.
@@ -802,15 +837,13 @@ public class KafkaIOTest {
int numElements = 1000;
- synchronized (MOCK_PRODUCER_LOCK) {
+ try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
- MOCK_PRODUCER.clear();
+ ProducerSendCompletionThread completionThreadWithErrors =
+ new ProducerSendCompletionThread(producerWrapper.mockProducer, 10, 100).start();
String topic = "test";
- ProducerSendCompletionThread completionThreadWithErrors =
- new ProducerSendCompletionThread(10, 100).start();
-
p
.apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
.withoutMetadata())
@@ -819,7 +852,7 @@ public class KafkaIOTest {
.withTopic(topic)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(LongSerializer.class)
- .withProducerFactoryFn(new ProducerFactoryFn()));
+ .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));
try {
p.run();
@@ -910,19 +943,21 @@ public class KafkaIOTest {
assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
}
- //@Test (TODO: reenable. global MOCK_PRODUCER does not work with 0.11 anymore for multiple tests)
+ @Test
public void testSinkDisplayData() {
- KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write()
+ try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
+ KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write()
.withBootstrapServers("myServerA:9092,myServerB:9092")
.withTopic("myTopic")
.withValueSerializer(LongSerializer.class)
- .withProducerFactoryFn(new ProducerFactoryFn());
+ .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey));
- DisplayData displayData = DisplayData.from(write);
+ DisplayData displayData = DisplayData.from(write);
- assertThat(displayData, hasDisplayItem("topic", "myTopic"));
- assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
- assertThat(displayData, hasDisplayItem("retries", 3));
+ assertThat(displayData, hasDisplayItem("topic", "myTopic"));
+ assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
+ assertThat(displayData, hasDisplayItem("retries", 3));
+ }
}
// interface for testing coder inference
@@ -1002,17 +1037,16 @@ public class KafkaIOTest {
KafkaIO.inferCoder(registry, NonInferableObjectDeserializer.class);
}
- // @Test (TODO: reenable. global MOCK_PRODUCER does not work with 0.11 anymore for multiple tests)
+ @Test
public void testSinkMetrics() throws Exception {
// Simply read from kafka source and write to kafka sink. Then verify the metrics are reported.
int numElements = 1000;
- synchronized (MOCK_PRODUCER_LOCK) {
+ try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
- MOCK_PRODUCER.clear();
-
- ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
+ ProducerSendCompletionThread completionThread =
+ new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
String topic = "test";
@@ -1024,7 +1058,7 @@ public class KafkaIOTest {
.withTopic(topic)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(LongSerializer.class)
- .withProducerFactoryFn(new ProducerFactoryFn()));
+ .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));
PipelineResult result = p.run();
@@ -1047,10 +1081,11 @@ public class KafkaIOTest {
}
}
- private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent) {
+ private static void verifyProducerRecords(MockProducer<Integer, Long> mockProducer,
+ String topic, int numElements, boolean keyIsAbsent) {
// verify that appropriate messages are written to kafka
- List<ProducerRecord<Integer, Long>> sent = MOCK_PRODUCER.history();
+ List<ProducerRecord<Integer, Long>> sent = mockProducer.history();
// sort by values
Collections.sort(sent, new Comparator<ProducerRecord<Integer, Long>>() {
@@ -1073,44 +1108,67 @@ public class KafkaIOTest {
}
/**
- * Singleton MockProudcer. Using a singleton here since we need access to the object to fetch
- * the actual records published to the producer. This prohibits running the tests using
- * the producer in parallel. These tests only take a few millis each.
+ * This wrapper over MockProducer places the mock producer in global MOCK_PRODUCER_MAP. The map
+ * is needed so that the producer returned by ProducerFactoryFn during pipeline can be used in
+ * verification after the test. We also override {@code flush()} method in MockProducer
+ * so that test can control behavior of {@code send()} method (e.g. inject errors).
*/
- private static final MockProducer<Integer, Long> MOCK_PRODUCER =
- new MockProducer<Integer, Long>(
- false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
- new IntegerSerializer(),
- new LongSerializer()) {
+ private static class MockProducerWrapper implements AutoCloseable {
- // override flush() so that it does not complete all the waiting sends, giving a chance to
- // ProducerCompletionThread to inject errors.
+ final String producerKey;
+ final MockProducer<Integer, Long> mockProducer;
- @Override
- public void flush() {
- while (completeNext()) {
- // there are some uncompleted records. let the completion thread handle them.
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
+ MockProducerWrapper() {
+ producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
+ mockProducer = new MockProducer<Integer, Long>(
+ false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
+ new IntegerSerializer(),
+ new LongSerializer()) {
+
+ // override flush() so that it does not complete all the waiting sends, giving a chance to
+ // ProducerCompletionThread to inject errors.
+
+ @Override
+ public void flush() {
+ while (completeNext()) {
+ // there are some uncompleted records. let the completion thread handle them.
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ // ok to retry.
+ }
}
}
+ };
+
+ // Add the producer to the global map so that producer factory function can access it.
+ assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer));
+ }
+
+ public void close() {
+ MOCK_PRODUCER_MAP.remove(producerKey);
+ if (!mockProducer.closed()) {
+ mockProducer.close();
}
- };
+ }
+ }
- // use a separate object serialize tests using MOCK_PRODUCER so that we don't interfere
- // with Kafka MockProducer locking itself.
- private static final Object MOCK_PRODUCER_LOCK = new Object();
+ private static final ConcurrentMap<String, MockProducer<Integer, Long>> MOCK_PRODUCER_MAP =
+ new ConcurrentHashMap<>();
private static class ProducerFactoryFn
implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>> {
+ final String producerKey;
+
+ ProducerFactoryFn(String producerKey) {
+ this.producerKey = producerKey;
+ }
@SuppressWarnings("unchecked")
@Override
public Producer<Integer, Long> apply(Map<String, Object> config) {
// Make sure the config is correctly set up for serializers.
-
// There may not be a key serializer if we're interested only in values.
if (config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) != null) {
Utils.newInstance(
@@ -1124,12 +1182,17 @@ public class KafkaIOTest {
.asSubclass(Serializer.class)
).configure(config, false);
- return MOCK_PRODUCER;
+ // Returning same producer in each instance in a pipeline seems to work fine currently.
+ // If DirectRunner creates multiple DoFn instances for sinks, we might need to handle
+ // it appropriately. I.e. allow multiple producers for each producerKey and concatenate
+ // all the messages written to each producer for verification after the pipeline finishes.
+
+ return MOCK_PRODUCER_MAP.get(producerKey);
}
}
private static class InjectedErrorException extends RuntimeException {
- public InjectedErrorException(String message) {
+ InjectedErrorException(String message) {
super(message);
}
}
@@ -1142,18 +1205,22 @@ public class KafkaIOTest {
*/
private static class ProducerSendCompletionThread {
+ private final MockProducer<Integer, Long> mockProducer;
private final int maxErrors;
private final int errorFrequency;
private final AtomicBoolean done = new AtomicBoolean(false);
private final ExecutorService injectorThread;
private int numCompletions = 0;
- ProducerSendCompletionThread() {
+ ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer) {
// complete everything successfully
- this(0, 0);
+ this(mockProducer, 0, 0);
}
- ProducerSendCompletionThread(final int maxErrors, final int errorFrequency) {
+ ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer,
+ int maxErrors,
+ int errorFrequency) {
+ this.mockProducer = mockProducer;
this.maxErrors = maxErrors;
this.errorFrequency = errorFrequency;
injectorThread = Executors.newSingleThreadExecutor();
@@ -1169,14 +1236,14 @@ public class KafkaIOTest {
boolean successful;
if (errorsInjected < maxErrors && ((numCompletions + 1) % errorFrequency) == 0) {
- successful = MOCK_PRODUCER.errorNext(
+ successful = mockProducer.errorNext(
new InjectedErrorException("Injected Error #" + (errorsInjected + 1)));
if (successful) {
errorsInjected++;
}
} else {
- successful = MOCK_PRODUCER.completeNext();
+ successful = mockProducer.completeNext();
}
if (successful) {
@@ -1186,6 +1253,7 @@ public class KafkaIOTest {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
+ // ok to retry.
}
}
}