You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/07 20:21:12 UTC
[02/12] incubator-beam git commit: [BEAM-151] Break out Dataflow
runner dependency to separate test file
[BEAM-151] Break out Dataflow runner dependency to separate test file
This allows for moving the Dataflow specific portion of the test to the Dataflow runner maven module.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a18ab871
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a18ab871
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a18ab871
Branch: refs/heads/master
Commit: a18ab871f8f1b9a3219cee2ecd5d519af23efa87
Parents: 7f0a2f7
Author: Luke Cwik <lc...@google.com>
Authored: Fri Mar 25 16:09:18 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Apr 7 11:18:32 2016 -0700
----------------------------------------------------------------------
.../sdk/transforms/DataflowGroupByKeyTest.java | 110 +++++++++++++++++++
.../dataflow/sdk/transforms/GroupByKeyTest.java | 62 -----------
2 files changed, 110 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a18ab871/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java
new file mode 100644
index 0000000..b05e7a2
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class DataflowGroupByKeyTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ /**
+ * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
+ * is not expanded. This is used for verifying that even without expansion the proper errors show
+ * up.
+ */
+ private Pipeline createTestServiceRunner() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowPipelineRunner.class);
+ options.setProject("someproject");
+ options.setStagingLocation("gs://staging");
+ options.setPathValidatorClass(NoopPathValidator.class);
+ options.setDataflowClient(null);
+ return Pipeline.create(options);
+ }
+
+ @Test
+ public void testInvalidWindowsService() {
+ Pipeline p = createTestServiceRunner();
+
+ List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
+
+ PCollection<KV<String, Integer>> input =
+ p.apply(Create.of(ungroupedPairs)
+ .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+ .apply(Window.<KV<String, Integer>>into(
+ Sessions.withGapDuration(Duration.standardMinutes(1))));
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("GroupByKey must have a valid Window merge function");
+ input
+ .apply("GroupByKey", GroupByKey.<String, Integer>create())
+ .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
+ }
+
+ @Test
+ public void testGroupByKeyServiceUnbounded() {
+ Pipeline p = createTestServiceRunner();
+
+ PCollection<KV<String, Integer>> input =
+ p.apply(
+ new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+ @Override
+ public PCollection<KV<String, Integer>> apply(PBegin input) {
+ return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED)
+ .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
+ }
+ });
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage(
+ "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
+ + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
+
+ input.apply("GroupByKey", GroupByKey.<String, Integer>create());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a18ab871/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
index bb64f60..6fb811e 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
@@ -28,10 +28,8 @@ import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.MapCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
@@ -41,7 +39,6 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PBegin;
@@ -241,21 +238,6 @@ public class GroupByKeyTest {
Duration.standardMinutes(1)))));
}
- /**
- * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
- * is not expanded. This is used for verifying that even without expansion the proper errors show
- * up.
- */
- private Pipeline createTestServiceRunner() {
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setRunner(DataflowPipelineRunner.class);
- options.setProject("someproject");
- options.setStagingLocation("gs://staging");
- options.setPathValidatorClass(NoopPathValidator.class);
- options.setDataflowClient(null);
- return Pipeline.create(options);
- }
-
private Pipeline createTestDirectRunner() {
DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
options.setRunner(DirectPipelineRunner.class);
@@ -282,25 +264,6 @@ public class GroupByKeyTest {
}
@Test
- public void testInvalidWindowsService() {
- Pipeline p = createTestServiceRunner();
-
- List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
-
- PCollection<KV<String, Integer>> input =
- p.apply(Create.of(ungroupedPairs)
- .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
- .apply(Window.<KV<String, Integer>>into(
- Sessions.withGapDuration(Duration.standardMinutes(1))));
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("GroupByKey must have a valid Window merge function");
- input
- .apply("GroupByKey", GroupByKey.<String, Integer>create())
- .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
- }
-
- @Test
public void testRemerge() {
Pipeline p = TestPipeline.create();
@@ -350,31 +313,6 @@ public class GroupByKeyTest {
input.apply("GroupByKey", GroupByKey.<String, Integer>create());
}
- @Test
- public void testGroupByKeyServiceUnbounded() {
- Pipeline p = createTestServiceRunner();
-
- PCollection<KV<String, Integer>> input =
- p.apply(
- new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
- @Override
- public PCollection<KV<String, Integer>> apply(PBegin input) {
- return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)
- .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
- }
- });
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(
- "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
- + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
-
- input.apply("GroupByKey", GroupByKey.<String, Integer>create());
- }
-
/**
* Tests that when two elements are combined via a GroupByKey their output timestamp agrees
* with the windowing function customized to actually be the same as the default, the earlier of