You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/07 20:21:12 UTC

[02/12] incubator-beam git commit: [BEAM-151] Break out Dataflow runner dependency to separate test file

[BEAM-151] Break out Dataflow runner dependency to separate test file

This allows for moving the Dataflow specific portion of the test to the Dataflow runner maven module.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a18ab871
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a18ab871
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a18ab871

Branch: refs/heads/master
Commit: a18ab871f8f1b9a3219cee2ecd5d519af23efa87
Parents: 7f0a2f7
Author: Luke Cwik <lc...@google.com>
Authored: Fri Mar 25 16:09:18 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Apr 7 11:18:32 2016 -0700

----------------------------------------------------------------------
 .../sdk/transforms/DataflowGroupByKeyTest.java  | 110 +++++++++++++++++++
 .../dataflow/sdk/transforms/GroupByKeyTest.java |  62 -----------
 2 files changed, 110 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a18ab871/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java
new file mode 100644
index 0000000..b05e7a2
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class DataflowGroupByKeyTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  /**
+   * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
+   * is not expanded. This is used for verifying that even without expansion the proper errors show
+   * up.
+   */
+  private Pipeline createTestServiceRunner() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setProject("someproject");
+    options.setStagingLocation("gs://staging");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setDataflowClient(null);
+    return Pipeline.create(options);
+  }
+
+  @Test
+  public void testInvalidWindowsService() {
+    Pipeline p = createTestServiceRunner();
+
+    List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(Create.of(ungroupedPairs)
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+        .apply(Window.<KV<String, Integer>>into(
+            Sessions.withGapDuration(Duration.standardMinutes(1))));
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("GroupByKey must have a valid Window merge function");
+    input
+        .apply("GroupByKey", GroupByKey.<String, Integer>create())
+        .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
+  }
+
+  @Test
+  public void testGroupByKeyServiceUnbounded() {
+    Pipeline p = createTestServiceRunner();
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(
+            new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+              @Override
+              public PCollection<KV<String, Integer>> apply(PBegin input) {
+                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
+                        input.getPipeline(),
+                        WindowingStrategy.globalDefault(),
+                        PCollection.IsBounded.UNBOUNDED)
+                    .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
+              }
+            });
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
+        + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
+
+    input.apply("GroupByKey", GroupByKey.<String, Integer>create());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a18ab871/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
index bb64f60..6fb811e 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
@@ -28,10 +28,8 @@ import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
 import com.google.cloud.dataflow.sdk.coders.KvCoder;
 import com.google.cloud.dataflow.sdk.coders.MapCoder;
 import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
 import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
 import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
 import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
@@ -41,7 +39,6 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
 import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PBegin;
@@ -241,21 +238,6 @@ public class GroupByKeyTest {
                     Duration.standardMinutes(1)))));
   }
 
-  /**
-   * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
-   * is not expanded. This is used for verifying that even without expansion the proper errors show
-   * up.
-   */
-  private Pipeline createTestServiceRunner() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setProject("someproject");
-    options.setStagingLocation("gs://staging");
-    options.setPathValidatorClass(NoopPathValidator.class);
-    options.setDataflowClient(null);
-    return Pipeline.create(options);
-  }
-
   private Pipeline createTestDirectRunner() {
     DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
     options.setRunner(DirectPipelineRunner.class);
@@ -282,25 +264,6 @@ public class GroupByKeyTest {
   }
 
   @Test
-  public void testInvalidWindowsService() {
-    Pipeline p = createTestServiceRunner();
-
-    List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
-
-    PCollection<KV<String, Integer>> input =
-        p.apply(Create.of(ungroupedPairs)
-            .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
-        .apply(Window.<KV<String, Integer>>into(
-            Sessions.withGapDuration(Duration.standardMinutes(1))));
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("GroupByKey must have a valid Window merge function");
-    input
-        .apply("GroupByKey", GroupByKey.<String, Integer>create())
-        .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
-  }
-
-  @Test
   public void testRemerge() {
     Pipeline p = TestPipeline.create();
 
@@ -350,31 +313,6 @@ public class GroupByKeyTest {
     input.apply("GroupByKey", GroupByKey.<String, Integer>create());
   }
 
-  @Test
-  public void testGroupByKeyServiceUnbounded() {
-    Pipeline p = createTestServiceRunner();
-
-    PCollection<KV<String, Integer>> input =
-        p.apply(
-            new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
-              @Override
-              public PCollection<KV<String, Integer>> apply(PBegin input) {
-                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
-                        input.getPipeline(),
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.UNBOUNDED)
-                    .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {});
-              }
-            });
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
-        + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
-
-    input.apply("GroupByKey", GroupByKey.<String, Integer>create());
-  }
-
   /**
    * Tests that when two elements are combined via a GroupByKey their output timestamp agrees
    * with the windowing function customized to actually be the same as the default, the earlier of