You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/20 19:04:53 UTC

[03/17] incubator-beam git commit: Migrated the beam-sdks-java-io-kafka module to TestPipeline as a JUnit rule.

Migrated the beam-sdks-java-io-kafka module to TestPipeline as a JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/12be8b1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/12be8b1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/12be8b1e

Branch: refs/heads/master
Commit: 12be8b1e6adf342e2b482aa37d5a9577e13802c5
Parents: 8d478c0
Author: Stas Levin <st...@gmail.com>
Authored: Tue Dec 20 17:38:38 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 20 09:55:45 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 25 +++++++++-----------
 1 file changed, 11 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12be8b1e/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index cc1ef26..071deea 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
@@ -95,6 +94,9 @@ public class KafkaIOTest {
    */
 
   @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  @Rule
   public ExpectedException thrown = ExpectedException.none();
 
   // Update mock consumer with records distributed among the given topics, each with given number
@@ -268,7 +270,6 @@ public class KafkaIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testUnboundedSource() {
-    Pipeline p = TestPipeline.create();
     int numElements = 1000;
 
     PCollection<Long> input = p
@@ -283,7 +284,6 @@ public class KafkaIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testUnboundedSourceWithExplicitPartitions() {
-    Pipeline p = TestPipeline.create();
     int numElements = 1000;
 
     List<String> topics = ImmutableList.of("test");
@@ -322,7 +322,7 @@ public class KafkaIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testUnboundedSourceTimestamps() {
-    Pipeline p = TestPipeline.create();
+
     int numElements = 1000;
 
     PCollection<Long> input = p
@@ -350,7 +350,7 @@ public class KafkaIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testUnboundedSourceSplits() throws Exception {
-    Pipeline p = TestPipeline.create();
+
     int numElements = 1000;
     int numSplits = 10;
 
@@ -514,10 +514,9 @@ public class KafkaIOTest {
 
       ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
 
-      Pipeline pipeline = TestPipeline.create();
       String topic = "test";
 
-      pipeline
+      p
         .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
             .withoutMetadata())
         .apply(KafkaIO.write()
@@ -527,7 +526,7 @@ public class KafkaIOTest {
             .withValueCoder(BigEndianLongCoder.of())
             .withProducerFactoryFn(new ProducerFactoryFn()));
 
-      pipeline.run();
+      p.run();
 
       completionThread.shutdown();
 
@@ -547,10 +546,9 @@ public class KafkaIOTest {
 
       ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
 
-      Pipeline pipeline = TestPipeline.create();
       String topic = "test";
 
-      pipeline
+      p
         .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
             .withoutMetadata())
         .apply(Values.<Long>create()) // there are no keys
@@ -562,7 +560,7 @@ public class KafkaIOTest {
             .withProducerFactoryFn(new ProducerFactoryFn())
             .values());
 
-      pipeline.run();
+      p.run();
 
       completionThread.shutdown();
 
@@ -588,13 +586,12 @@ public class KafkaIOTest {
 
       MOCK_PRODUCER.clear();
 
-      Pipeline pipeline = TestPipeline.create();
       String topic = "test";
 
       ProducerSendCompletionThread completionThreadWithErrors =
           new ProducerSendCompletionThread(10, 100).start();
 
-      pipeline
+      p
         .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
             .withoutMetadata())
         .apply(KafkaIO.write()
@@ -605,7 +602,7 @@ public class KafkaIOTest {
             .withProducerFactoryFn(new ProducerFactoryFn()));
 
       try {
-        pipeline.run();
+        p.run();
       } catch (PipelineExecutionException e) {
         // throwing inner exception helps assert that first exception is thrown from the Sink
         throw e.getCause().getCause();