You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 22:49:47 UTC

[13/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule.

Migrated the beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/950aa7e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/950aa7e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/950aa7e1

Branch: refs/heads/python-sdk
Commit: 950aa7e1d9c50167933eb192a16e15700e483377
Parents: 12be8b1
Author: Stas Levin <st...@gmail.com>
Authored: Tue Dec 20 17:44:15 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 20 09:55:46 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java   | 7 +++++--
 .../java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java  | 6 ++++--
 2 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/950aa7e1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index f0ab46c..075805e 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -22,19 +22,23 @@ import static com.google.common.collect.Lists.newArrayList;
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import com.google.common.collect.Iterables;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.DateTime;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
  * Tests {@link AmazonKinesisMock}.
  */
 public class KinesisMockReadTest {
+
+    @Rule
+    public final transient TestPipeline p = TestPipeline.create();
+
     @Test
     public void readsDataFromMockKinesis() {
         int noOfShards = 3;
@@ -42,7 +46,6 @@ public class KinesisMockReadTest {
         List<List<AmazonKinesisMock.TestData>> testData =
                 provideTestData(noOfShards, noOfEventsPerShard);
 
-        final Pipeline p = TestPipeline.create();
         PCollection<AmazonKinesisMock.TestData> result = p.
                 apply(
                         KinesisIO.Read.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/950aa7e1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
index 73a2455..690cc11 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
@@ -43,6 +42,7 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
@@ -53,6 +53,8 @@ public class KinesisReaderIT {
     private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
     private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
 
+    @Rule
+    public final transient TestPipeline p = TestPipeline.create();
 
     @Ignore
     @Test
@@ -76,7 +78,7 @@ public class KinesisReaderIT {
 
     private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
             throws InterruptedException {
-        final Pipeline p = TestPipeline.create();
+
         PCollection<String> result = p.
                 apply(KinesisIO.Read.
                         from(options.getAwsKinesisStream(), Instant.now()).