You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/10/17 15:17:41 UTC

[06/13] beam git commit: Improve how we use MockProducer in tests. Remove global instance and use a map to use same producer instance inside beam and test verifier.

Improve how we use MockProducer in tests.
Remove global instance and use a map to use same producer
instance inside beam and test verifier.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bd4085d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bd4085d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bd4085d

Branch: refs/heads/master
Commit: 3bd4085db154241c477f8cc318162530029456fb
Parents: 236484b
Author: Raghu Angadi <ra...@google.com>
Authored: Tue Aug 22 14:37:59 2017 -0700
Committer: Raghu Angadi <ra...@google.com>
Committed: Tue Oct 17 00:02:05 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |   2 +-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 208 ++++++++++++-------
 2 files changed, 139 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3bd4085d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 78227a0..1d9560b 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1811,7 +1811,7 @@ public class KafkaIO {
     //     and id, it implies that record and the associated id are checkpointed to persistent
     //     storage and this record will always have same id, even in retries.
     //     Exactly-once semantics are achieved by writing records in the strict order of
-    //     these checkpointed sequence ids.
+    //     these check-pointed sequence ids.
     //
     // Parallelism for B and C is fixed to 'numShards', which defaults to number of partitions
     // for the topic. A few reasons for that:

http://git-wip-us.apache.org/repos/asf/beam/blob/3bd4085d/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index eaf30d6..88c9d21 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -34,13 +34,17 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -180,7 +184,7 @@ public class KafkaIOTest {
           //1. SpEL can find this function, either input is List or Collection;
           //2. List extends Collection, so super.assign() could find either assign(List)
           //  or assign(Collection).
-          public void assign(final List<TopicPartition> assigned) {
+          public void assign(final Collection<TopicPartition> assigned) {
             super.assign(assigned);
             assignedPartitions.set(ImmutableList.copyOf(assigned));
             for (TopicPartition tp : assigned) {
@@ -724,11 +728,10 @@ public class KafkaIOTest {
 
     int numElements = 1000;
 
-    synchronized (MOCK_PRODUCER_LOCK) {
+    try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
 
-      MOCK_PRODUCER.clear();
-
-      ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
+      ProducerSendCompletionThread completionThread =
+        new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
 
       String topic = "test";
 
@@ -740,32 +743,26 @@ public class KafkaIOTest {
             .withTopic(topic)
             .withKeySerializer(IntegerSerializer.class)
             .withValueSerializer(LongSerializer.class)
-            .withEOS()
-            .withSinkGroupId("test")
-            .withNumShards(1)
-            .withConsumerFactoryFn(new ConsumerFactoryFn(
-              Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST))
-            .withProducerFactoryFn(new ProducerFactoryFn()));
+            .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));
 
       p.run();
 
       completionThread.shutdown();
 
-      verifyProducerRecords(topic, numElements, false);
+      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false);
     }
   }
 
-  //@Test (TODO: reenable. global MOCK_PRODUCER does not work with 0.11 anymore for multiple tests)
+  @Test
   public void testValuesSink() throws Exception {
     // similar to testSink(), but use values()' interface.
 
     int numElements = 1000;
 
-    synchronized (MOCK_PRODUCER_LOCK) {
-
-      MOCK_PRODUCER.clear();
+    try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
 
-      ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
+      ProducerSendCompletionThread completionThread =
+        new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
 
       String topic = "test";
 
@@ -777,18 +774,56 @@ public class KafkaIOTest {
             .withBootstrapServers("none")
             .withTopic(topic)
             .withValueSerializer(LongSerializer.class)
-            .withProducerFactoryFn(new ProducerFactoryFn())
+            .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))
             .values());
 
       p.run();
 
       completionThread.shutdown();
 
-      verifyProducerRecords(topic, numElements, true);
+      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, true);
+    }
+  }
+
+  @Test
+  public void testEOSink() throws Exception {
+    // similar to testSink(), enables to EOS.
+    // This does not actually test exactly-once-semantics. Mainly exercises the code.
+
+    int numElements = 1000;
+
+    try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
+
+      ProducerSendCompletionThread completionThread =
+        new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
+
+      String topic = "test";
+
+      p
+        .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+                 .withoutMetadata())
+        .apply(KafkaIO.<Integer, Long>write()
+                 .withBootstrapServers("none")
+                 .withTopic(topic)
+                 .withKeySerializer(IntegerSerializer.class)
+                 .withValueSerializer(LongSerializer.class)
+                 .withEOS()
+                 .withSinkGroupId("test")
+                 .withNumShards(1)
+                 .withConsumerFactoryFn(new ConsumerFactoryFn(
+                   Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST))
+                 .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));
+
+      p.run();
+
+      completionThread.shutdown();
+
+      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false);
     }
   }
 
-  //@Test (TODO: reenable. global MOCK_PRODUCER does not work with 0.11 anymore for multiple tests)
+
+  @Test
   public void testSinkWithSendErrors() throws Throwable {
     // similar to testSink(), except that up to 10 of the send calls to producer will fail
     // asynchronously.
@@ -802,15 +837,13 @@ public class KafkaIOTest {
 
     int numElements = 1000;
 
-    synchronized (MOCK_PRODUCER_LOCK) {
+    try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
 
-      MOCK_PRODUCER.clear();
+      ProducerSendCompletionThread completionThreadWithErrors =
+        new ProducerSendCompletionThread(producerWrapper.mockProducer, 10, 100).start();
 
       String topic = "test";
 
-      ProducerSendCompletionThread completionThreadWithErrors =
-          new ProducerSendCompletionThread(10, 100).start();
-
       p
         .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
             .withoutMetadata())
@@ -819,7 +852,7 @@ public class KafkaIOTest {
             .withTopic(topic)
             .withKeySerializer(IntegerSerializer.class)
             .withValueSerializer(LongSerializer.class)
-            .withProducerFactoryFn(new ProducerFactoryFn()));
+            .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));
 
       try {
         p.run();
@@ -910,19 +943,21 @@ public class KafkaIOTest {
     assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
   }
 
-  //@Test (TODO: reenable. global MOCK_PRODUCER does not work with 0.11 anymore for multiple tests)
+  @Test
   public void testSinkDisplayData() {
-    KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write()
+    try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
+      KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write()
         .withBootstrapServers("myServerA:9092,myServerB:9092")
         .withTopic("myTopic")
         .withValueSerializer(LongSerializer.class)
-        .withProducerFactoryFn(new ProducerFactoryFn());
+        .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey));
 
-    DisplayData displayData = DisplayData.from(write);
+      DisplayData displayData = DisplayData.from(write);
 
-    assertThat(displayData, hasDisplayItem("topic", "myTopic"));
-    assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
-    assertThat(displayData, hasDisplayItem("retries", 3));
+      assertThat(displayData, hasDisplayItem("topic", "myTopic"));
+      assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
+      assertThat(displayData, hasDisplayItem("retries", 3));
+    }
   }
 
   // interface for testing coder inference
@@ -1002,17 +1037,16 @@ public class KafkaIOTest {
     KafkaIO.inferCoder(registry, NonInferableObjectDeserializer.class);
   }
 
-  // @Test (TODO: reenable. global MOCK_PRODUCER does not work with 0.11 anymore for multiple tests)
+  @Test
   public void testSinkMetrics() throws Exception {
     // Simply read from kafka source and write to kafka sink. Then verify the metrics are reported.
 
     int numElements = 1000;
 
-    synchronized (MOCK_PRODUCER_LOCK) {
+    try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
 
-      MOCK_PRODUCER.clear();
-
-      ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
+      ProducerSendCompletionThread completionThread =
+        new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
 
       String topic = "test";
 
@@ -1024,7 +1058,7 @@ public class KafkaIOTest {
               .withTopic(topic)
               .withKeySerializer(IntegerSerializer.class)
               .withValueSerializer(LongSerializer.class)
-              .withProducerFactoryFn(new ProducerFactoryFn()));
+              .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));
 
       PipelineResult result = p.run();
 
@@ -1047,10 +1081,11 @@ public class KafkaIOTest {
     }
   }
 
-  private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent) {
+  private static void verifyProducerRecords(MockProducer<Integer, Long> mockProducer,
+                                            String topic, int numElements, boolean keyIsAbsent) {
 
     // verify that appropriate messages are written to kafka
-    List<ProducerRecord<Integer, Long>> sent = MOCK_PRODUCER.history();
+    List<ProducerRecord<Integer, Long>> sent = mockProducer.history();
 
     // sort by values
     Collections.sort(sent, new Comparator<ProducerRecord<Integer, Long>>() {
@@ -1073,44 +1108,67 @@ public class KafkaIOTest {
   }
 
   /**
-   * Singleton MockProudcer. Using a singleton here since we need access to the object to fetch
-   * the actual records published to the producer. This prohibits running the tests using
-   * the producer in parallel. These tests only take a few millis each.
+   * This wrapper over MockProducer places the mock producer in global MOCK_PRODUCER_MAP. The map
+   * is needed so that the producer returned by ProducerFactoryFn during pipeline can be used in
+   * verification after the test. We also override {@code flush()} method in MockProducer
+   * so that test can control behavior of {@code send()} method (e.g. inject errors).
    */
-  private static final MockProducer<Integer, Long> MOCK_PRODUCER =
-    new MockProducer<Integer, Long>(
-      false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
-      new IntegerSerializer(),
-      new LongSerializer()) {
+  private static class MockProducerWrapper implements AutoCloseable {
 
-      // override flush() so that it does not complete all the waiting sends, giving a chance to
-      // ProducerCompletionThread to inject errors.
+    final String producerKey;
+    final MockProducer<Integer, Long> mockProducer;
 
-      @Override
-      public void flush() {
-        while (completeNext()) {
-          // there are some uncompleted records. let the completion thread handle them.
-          try {
-            Thread.sleep(10);
-          } catch (InterruptedException e) {
+    MockProducerWrapper() {
+      producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
+      mockProducer = new MockProducer<Integer, Long>(
+        false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
+        new IntegerSerializer(),
+        new LongSerializer()) {
+
+        // override flush() so that it does not complete all the waiting sends, giving a chance to
+        // ProducerCompletionThread to inject errors.
+
+        @Override
+        public void flush() {
+          while (completeNext()) {
+            // there are some uncompleted records. let the completion thread handle them.
+            try {
+              Thread.sleep(10);
+            } catch (InterruptedException e) {
+              // ok to retry.
+            }
           }
         }
+      };
+
+      // Add the producer to the global map so that producer factory function can access it.
+      assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer));
+    }
+
+    public void close() {
+      MOCK_PRODUCER_MAP.remove(producerKey);
+      if (!mockProducer.closed()) {
+        mockProducer.close();
       }
-    };
+    }
+  }
 
-  // use a separate object serialize tests using MOCK_PRODUCER so that we don't interfere
-  // with Kafka MockProducer locking itself.
-  private static final Object MOCK_PRODUCER_LOCK = new Object();
+  private static final ConcurrentMap<String, MockProducer<Integer, Long>> MOCK_PRODUCER_MAP =
+    new ConcurrentHashMap<>();
 
   private static class ProducerFactoryFn
     implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>> {
+    final String producerKey;
+
+    ProducerFactoryFn(String producerKey) {
+      this.producerKey = producerKey;
+    }
 
     @SuppressWarnings("unchecked")
     @Override
     public Producer<Integer, Long> apply(Map<String, Object> config) {
 
       // Make sure the config is correctly set up for serializers.
-
       // There may not be a key serializer if we're interested only in values.
       if (config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) != null) {
         Utils.newInstance(
@@ -1124,12 +1182,17 @@ public class KafkaIOTest {
               .asSubclass(Serializer.class)
       ).configure(config, false);
 
-      return MOCK_PRODUCER;
+      // Returning same producer in each instance in a pipeline seems to work fine currently.
+      // If DirectRunner creates multiple DoFn instances for sinks, we might need to handle
+      // it appropriately. I.e. allow multiple producers for each producerKey and concatenate
+      // all the messages written to each producer for verification after the pipeline finishes.
+
+      return MOCK_PRODUCER_MAP.get(producerKey);
     }
   }
 
   private static class InjectedErrorException extends RuntimeException {
-    public InjectedErrorException(String message) {
+    InjectedErrorException(String message) {
       super(message);
     }
   }
@@ -1142,18 +1205,22 @@ public class KafkaIOTest {
    */
   private static class ProducerSendCompletionThread {
 
+    private final MockProducer<Integer, Long> mockProducer;
     private final int maxErrors;
     private final int errorFrequency;
     private final AtomicBoolean done = new AtomicBoolean(false);
     private final ExecutorService injectorThread;
     private int numCompletions = 0;
 
-    ProducerSendCompletionThread() {
+    ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer) {
       // complete everything successfully
-      this(0, 0);
+      this(mockProducer, 0, 0);
     }
 
-    ProducerSendCompletionThread(final int maxErrors, final int errorFrequency) {
+    ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer,
+                                 int maxErrors,
+                                 int errorFrequency) {
+      this.mockProducer = mockProducer;
       this.maxErrors = maxErrors;
       this.errorFrequency = errorFrequency;
       injectorThread = Executors.newSingleThreadExecutor();
@@ -1169,14 +1236,14 @@ public class KafkaIOTest {
             boolean successful;
 
             if (errorsInjected < maxErrors && ((numCompletions + 1) % errorFrequency) == 0) {
-              successful = MOCK_PRODUCER.errorNext(
+              successful = mockProducer.errorNext(
                   new InjectedErrorException("Injected Error #" + (errorsInjected + 1)));
 
               if (successful) {
                 errorsInjected++;
               }
             } else {
-              successful = MOCK_PRODUCER.completeNext();
+              successful = mockProducer.completeNext();
             }
 
             if (successful) {
@@ -1186,6 +1253,7 @@ public class KafkaIOTest {
               try {
                 Thread.sleep(1);
               } catch (InterruptedException e) {
+                // ok to retry.
               }
             }
           }