You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/20 19:04:53 UTC
[03/17] incubator-beam git commit: Migrated the
beam-sdks-java-io-kafka module to TestPipeline as a JUnit rule.
Migrated the beam-sdks-java-io-kafka module to TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/12be8b1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/12be8b1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/12be8b1e
Branch: refs/heads/master
Commit: 12be8b1e6adf342e2b482aa37d5a9577e13802c5
Parents: 8d478c0
Author: Stas Levin <st...@gmail.com>
Authored: Tue Dec 20 17:38:38 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 20 09:55:45 2016 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 25 +++++++++-----------
1 file changed, 11 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12be8b1e/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index cc1ef26..071deea 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
@@ -95,6 +94,9 @@ public class KafkaIOTest {
*/
@Rule
+ public final transient TestPipeline p = TestPipeline.create();
+
+ @Rule
public ExpectedException thrown = ExpectedException.none();
// Update mock consumer with records distributed among the given topics, each with given number
@@ -268,7 +270,6 @@ public class KafkaIOTest {
@Test
@Category(NeedsRunner.class)
public void testUnboundedSource() {
- Pipeline p = TestPipeline.create();
int numElements = 1000;
PCollection<Long> input = p
@@ -283,7 +284,6 @@ public class KafkaIOTest {
@Test
@Category(NeedsRunner.class)
public void testUnboundedSourceWithExplicitPartitions() {
- Pipeline p = TestPipeline.create();
int numElements = 1000;
List<String> topics = ImmutableList.of("test");
@@ -322,7 +322,7 @@ public class KafkaIOTest {
@Test
@Category(NeedsRunner.class)
public void testUnboundedSourceTimestamps() {
- Pipeline p = TestPipeline.create();
+
int numElements = 1000;
PCollection<Long> input = p
@@ -350,7 +350,7 @@ public class KafkaIOTest {
@Test
@Category(NeedsRunner.class)
public void testUnboundedSourceSplits() throws Exception {
- Pipeline p = TestPipeline.create();
+
int numElements = 1000;
int numSplits = 10;
@@ -514,10 +514,9 @@ public class KafkaIOTest {
ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
- Pipeline pipeline = TestPipeline.create();
String topic = "test";
- pipeline
+ p
.apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
.withoutMetadata())
.apply(KafkaIO.write()
@@ -527,7 +526,7 @@ public class KafkaIOTest {
.withValueCoder(BigEndianLongCoder.of())
.withProducerFactoryFn(new ProducerFactoryFn()));
- pipeline.run();
+ p.run();
completionThread.shutdown();
@@ -547,10 +546,9 @@ public class KafkaIOTest {
ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
- Pipeline pipeline = TestPipeline.create();
String topic = "test";
- pipeline
+ p
.apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
.withoutMetadata())
.apply(Values.<Long>create()) // there are no keys
@@ -562,7 +560,7 @@ public class KafkaIOTest {
.withProducerFactoryFn(new ProducerFactoryFn())
.values());
- pipeline.run();
+ p.run();
completionThread.shutdown();
@@ -588,13 +586,12 @@ public class KafkaIOTest {
MOCK_PRODUCER.clear();
- Pipeline pipeline = TestPipeline.create();
String topic = "test";
ProducerSendCompletionThread completionThreadWithErrors =
new ProducerSendCompletionThread(10, 100).start();
- pipeline
+ p
.apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
.withoutMetadata())
.apply(KafkaIO.write()
@@ -605,7 +602,7 @@ public class KafkaIOTest {
.withProducerFactoryFn(new ProducerFactoryFn()));
try {
- pipeline.run();
+ p.run();
} catch (PipelineExecutionException e) {
// throwing inner exception helps assert that first exception is thrown from the Sink
throw e.getCause().getCause();