You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 22:49:47 UTC
[13/51] [abbrv] incubator-beam git commit: Migrated the
beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule.
Migrated the beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/950aa7e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/950aa7e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/950aa7e1
Branch: refs/heads/python-sdk
Commit: 950aa7e1d9c50167933eb192a16e15700e483377
Parents: 12be8b1
Author: Stas Levin <st...@gmail.com>
Authored: Tue Dec 20 17:44:15 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 20 09:55:46 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java | 7 +++++--
.../java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java | 6 ++++--
2 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/950aa7e1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index f0ab46c..075805e 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -22,19 +22,23 @@ import static com.google.common.collect.Lists.newArrayList;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.google.common.collect.Iterables;
import java.util.List;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.DateTime;
+import org.junit.Rule;
import org.junit.Test;
/**
* Tests {@link AmazonKinesisMock}.
*/
public class KinesisMockReadTest {
+
+ @Rule
+ public final transient TestPipeline p = TestPipeline.create();
+
@Test
public void readsDataFromMockKinesis() {
int noOfShards = 3;
@@ -42,7 +46,6 @@ public class KinesisMockReadTest {
List<List<AmazonKinesisMock.TestData>> testData =
provideTestData(noOfShards, noOfEventsPerShard);
- final Pipeline p = TestPipeline.create();
PCollection<AmazonKinesisMock.TestData> result = p.
apply(
KinesisIO.Read.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/950aa7e1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
index 73a2455..690cc11 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
@@ -43,6 +42,7 @@ import org.apache.commons.lang.RandomStringUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
/**
@@ -53,6 +53,8 @@ public class KinesisReaderIT {
private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
+ @Rule
+ public final transient TestPipeline p = TestPipeline.create();
@Ignore
@Test
@@ -76,7 +78,7 @@ public class KinesisReaderIT {
private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
throws InterruptedException {
- final Pipeline p = TestPipeline.create();
+
PCollection<String> result = p.
apply(KinesisIO.Read.
from(options.getAwsKinesisStream(), Instant.now()).