You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/21 01:20:52 UTC

[1/3] beam git commit: Make crashing errors in Structs unchecked exceptions

Repository: beam
Updated Branches:
  refs/heads/master 524165ac9 -> e4eae7bce


Make crashing errors in Structs unchecked exceptions


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c0bdd6c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c0bdd6c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c0bdd6c

Branch: refs/heads/master
Commit: 4c0bdd6c002b83c67daedd5e01ee2ad0dd47c233
Parents: b47fd52
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 20 14:32:29 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 20 14:32:29 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/Structs.java  | 74 ++++++++------------
 1 file changed, 31 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4c0bdd6c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java
index d50b74a..a4be054 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Structs.java
@@ -26,23 +26,22 @@ import java.util.Map;
 import javax.annotation.Nullable;
 
 /**
- * A collection of static methods for manipulating datastructure representations
- * transferred via the Dataflow API.
+ * A collection of static methods for manipulating datastructure representations transferred via the
+ * Dataflow API.
  */
 public final class Structs {
-  private Structs() {}  // Non-instantiable
+  private Structs() {} // Non-instantiable
 
-  public static String getString(Map<String, Object> map, String name) throws Exception {
+  public static String getString(Map<String, Object> map, String name) {
     return getValue(map, name, String.class, "a string");
   }
 
   public static String getString(
-      Map<String, Object> map, String name, @Nullable String defaultValue)
-      throws Exception {
+      Map<String, Object> map, String name, @Nullable String defaultValue) {
     return getValue(map, name, String.class, "a string", defaultValue);
   }
 
-  public static byte[] getBytes(Map<String, Object> map, String name) throws Exception {
+  public static byte[] getBytes(Map<String, Object> map, String name) {
     @Nullable byte[] result = getBytes(map, name, null);
     if (result == null) {
       throw new ParameterNotFoundException(name, map);
@@ -51,8 +50,8 @@ public final class Structs {
   }
 
   @Nullable
-  public static byte[] getBytes(Map<String, Object> map, String name, @Nullable byte[] defaultValue)
-      throws Exception {
+  public static byte[] getBytes(
+      Map<String, Object> map, String name, @Nullable byte[] defaultValue) {
     @Nullable String jsonString = getString(map, name, null);
     if (jsonString == null) {
       return defaultValue;
@@ -63,41 +62,38 @@ public final class Structs {
     return StringUtils.jsonStringToByteArray(jsonString);
   }
 
-  public static Boolean getBoolean(Map<String, Object> map, String name) throws Exception {
+  public static Boolean getBoolean(Map<String, Object> map, String name) {
     return getValue(map, name, Boolean.class, "a boolean");
   }
 
   @Nullable
   public static Boolean getBoolean(
-      Map<String, Object> map, String name, @Nullable Boolean defaultValue)
-      throws Exception {
+      Map<String, Object> map, String name, @Nullable Boolean defaultValue) {
     return getValue(map, name, Boolean.class, "a boolean", defaultValue);
   }
 
-  public static Long getLong(Map<String, Object> map, String name) throws Exception {
+  public static Long getLong(Map<String, Object> map, String name) {
     return getValue(map, name, Long.class, "a long");
   }
 
   @Nullable
-  public static Long getLong(Map<String, Object> map, String name, @Nullable Long defaultValue)
-      throws Exception {
+  public static Long getLong(Map<String, Object> map, String name, @Nullable Long defaultValue) {
     return getValue(map, name, Long.class, "a long", defaultValue);
   }
 
-  public static Integer getInt(Map<String, Object> map, String name) throws Exception {
+  public static Integer getInt(Map<String, Object> map, String name) {
     return getValue(map, name, Integer.class, "an int");
   }
 
   @Nullable
-  public static Integer getInt(Map<String, Object> map, String name, @Nullable Integer defaultValue)
-      throws Exception {
+  public static Integer getInt(
+      Map<String, Object> map, String name, @Nullable Integer defaultValue) {
     return getValue(map, name, Integer.class, "an int", defaultValue);
   }
 
   @Nullable
   public static List<String> getStrings(
-      Map<String, Object> map, String name, @Nullable List<String> defaultValue)
-      throws Exception {
+      Map<String, Object> map, String name, @Nullable List<String> defaultValue) {
     @Nullable Object value = map.get(name);
     if (value == null) {
       if (map.containsKey(name)) {
@@ -130,8 +126,7 @@ public final class Structs {
     return result;
   }
 
-  public static Map<String, Object> getObject(Map<String, Object> map, String name)
-      throws Exception {
+  public static Map<String, Object> getObject(Map<String, Object> map, String name) {
     @Nullable Map<String, Object> result = getObject(map, name, null);
     if (result == null) {
       throw new ParameterNotFoundException(name, map);
@@ -141,8 +136,7 @@ public final class Structs {
 
   @Nullable
   public static Map<String, Object> getObject(
-      Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue)
-      throws Exception {
+      Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) {
     @Nullable Object value = map.get(name);
     if (value == null) {
       if (map.containsKey(name)) {
@@ -154,7 +148,7 @@ public final class Structs {
   }
 
   private static Map<String, Object> checkObject(
-      Object value, Map<String, Object> map, String name) throws Exception {
+      Object value, Map<String, Object> map, String name) {
     if (Data.isNull(value)) {
       // This is a JSON literal null.  When represented as an object, this is an
       // empty map.
@@ -166,17 +160,16 @@ public final class Structs {
     @SuppressWarnings("unchecked")
     Map<String, Object> mapValue = (Map<String, Object>) value;
     if (!mapValue.containsKey(PropertyNames.OBJECT_TYPE_NAME)) {
-      throw new IncorrectTypeException(name, map,
-          "an object (no \"" + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
+      throw new IncorrectTypeException(
+          name, map, "an object (no \"" + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
     }
     return mapValue;
   }
 
   @Nullable
-  public static List<Map<String, Object>> getListOfMaps(Map<String, Object> map, String name,
-      @Nullable List<Map<String, Object>> defaultValue) throws Exception {
-    @Nullable
-    Object value = map.get(name);
+  public static List<Map<String, Object>> getListOfMaps(
+      Map<String, Object> map, String name, @Nullable List<Map<String, Object>> defaultValue) {
+    @Nullable Object value = map.get(name);
     if (value == null) {
       if (map.containsKey(name)) {
         throw new IncorrectTypeException(name, map, "a list");
@@ -205,8 +198,7 @@ public final class Structs {
     return result;
   }
 
-  public static Map<String, Object> getDictionary(
-      Map<String, Object> map, String name) throws Exception {
+  public static Map<String, Object> getDictionary(Map<String, Object> map, String name) {
     @Nullable Object value = map.get(name);
     if (value == null) {
       throw new ParameterNotFoundException(name, map);
@@ -226,8 +218,7 @@ public final class Structs {
 
   @Nullable
   public static Map<String, Object> getDictionary(
-      Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue)
-      throws Exception {
+      Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue) {
     @Nullable Object value = map.get(name);
     if (value == null) {
       if (map.containsKey(name)) {
@@ -262,8 +253,7 @@ public final class Structs {
     addObject(map, name, CloudObject.forInteger(value));
   }
 
-  public static void addObject(
-      Map<String, Object> map, String name, Map<String, Object> value) {
+  public static void addObject(Map<String, Object> map, String name, Map<String, Object> value) {
     map.put(name, value);
   }
 
@@ -308,8 +298,7 @@ public final class Structs {
 
   // Helper methods for a few of the accessor methods.
 
-  private static <T> T getValue(Map<String, Object> map, String name, Class<T> clazz, String type)
-      throws Exception {
+  private static <T> T getValue(Map<String, Object> map, String name, Class<T> clazz, String type) {
     @Nullable T result = getValue(map, name, clazz, type, null);
     if (result == null) {
       throw new ParameterNotFoundException(name, map);
@@ -319,8 +308,7 @@ public final class Structs {
 
   @Nullable
   private static <T> T getValue(
-      Map<String, Object> map, String name, Class<T> clazz, String type, @Nullable T defaultValue)
-      throws Exception {
+      Map<String, Object> map, String name, Class<T> clazz, String type, @Nullable T defaultValue) {
     @Nullable Object value = map.get(name);
     if (value == null) {
       if (map.containsKey(name)) {
@@ -369,13 +357,13 @@ public final class Structs {
     }
   }
 
-  private static final class ParameterNotFoundException extends Exception {
+  private static final class ParameterNotFoundException extends RuntimeException {
     public ParameterNotFoundException(String name, Map<String, Object> map) {
       super("didn't find required parameter " + name + " in " + map);
     }
   }
 
-  private static final class IncorrectTypeException extends Exception {
+  private static final class IncorrectTypeException extends RuntimeException {
     public IncorrectTypeException(String name, Map<String, Object> map, String type) {
       super("required parameter " + name + " in " + map + " not " + type);
     }


[3/3] beam git commit: This closes #2618: Use step-derived PCollection names in Dataflow

Posted by ke...@apache.org.
This closes #2618: Use step-derived PCollection names in Dataflow

  Derive Dataflow output names from steps, not PCollection names
  Make crashing errors in Structs unchecked exceptions


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e4eae7bc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e4eae7bc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e4eae7bc

Branch: refs/heads/master
Commit: e4eae7bce4da75744379711d8f45d00b2ba07ad3
Parents: 524165a c9ed8f9
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 20 18:19:18 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 20 18:19:18 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    |  7 +-
 .../DataflowPipelineTranslatorTest.java         | 94 ++++++++++++++++++++
 .../java/org/apache/beam/sdk/util/Structs.java  | 74 +++++++--------
 3 files changed, 131 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e4eae7bc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------


[2/3] beam git commit: Derive Dataflow output names from steps, not PCollection names

Posted by ke...@apache.org.
Derive Dataflow output names from steps, not PCollection names

Long ago, PCollection names were assigned after transform replacements took
place, because this happened interleaved with pipeline construction. Now,
runner-independent graphs are constructed with named PCollections and when
replacements occur, the names are preserved. This exposed a bug in Dataflow
whereby the names of steps and the names of PCollections are tightly coupled.

This change uses the mandatory derived names during translation, shielding
users from the bug.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9ed8f9a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9ed8f9a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9ed8f9a

Branch: refs/heads/master
Commit: c9ed8f9a69d2b3f17e782f4bd0da9bd4305f2320
Parents: 4c0bdd6
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 20 15:32:51 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 20 15:32:51 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    |  7 +-
 .../DataflowPipelineTranslatorTest.java         | 94 ++++++++++++++++++++
 2 files changed, 100 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c9ed8f9a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index abeca4d..0c0a2ef 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -656,7 +656,12 @@ public class DataflowPipelineTranslator {
 
       Map<String, Object> outputInfo = new HashMap<>();
       addString(outputInfo, PropertyNames.OUTPUT_NAME, Long.toString(id));
-      addString(outputInfo, PropertyNames.USER_NAME, value.getName());
+
+      String stepName = getString(properties, PropertyNames.USER_NAME);
+      String generatedName = String.format(
+          "%s.out%d", stepName, outputInfoList.size());
+
+      addString(outputInfo, PropertyNames.USER_NAME, generatedName);
       if (value instanceof PCollection
           && translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
         addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);

http://git-wip-us.apache.org/repos/asf/beam/blob/c9ed8f9a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 5016d88..9396169 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -772,6 +772,100 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   }
 
   /**
+   * Test that in translation the name for a collection (in this case just a Create output) is
+   * overriden to be what the Dataflow service expects.
+   */
+  @Test
+  public void testNamesOverridden() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    options.setStreaming(false);
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    pipeline.apply("Jazzy", Create.of(3)).setName("foobizzle");
+
+    runner.replaceTransforms(pipeline);
+
+    Job job = translator.translate(pipeline,
+        runner,
+        Collections.<DataflowPackage>emptyList()).getJob();
+
+    // The Create step
+    Step step = job.getSteps().get(0);
+
+    // This is the name that is "set by the user" that the Dataflow translator must override
+    String userSpecifiedName =
+        Structs.getString(
+            Structs.getListOfMaps(
+                step.getProperties(),
+                PropertyNames.OUTPUT_INFO,
+                null).get(0),
+        PropertyNames.USER_NAME);
+
+    // This is the calculated name that must actually be used
+    String calculatedName = getString(step.getProperties(), PropertyNames.USER_NAME) + ".out0";
+
+    assertThat(userSpecifiedName, equalTo(calculatedName));
+  }
+
+  /**
+   * Test that in translation the name for collections of a multi-output ParDo - a special case
+   * because the user can name tags - are overridden to be what the Dataflow service expects.
+   */
+  @Test
+  public void testTaggedNamesOverridden() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    options.setStreaming(false);
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    TupleTag<Integer> tag1 = new TupleTag<Integer>("frazzle") {};
+    TupleTag<Integer> tag2 = new TupleTag<Integer>("bazzle") {};
+    TupleTag<Integer> tag3 = new TupleTag<Integer>() {};
+
+    PCollectionTuple outputs =
+        pipeline
+            .apply(Create.of(3))
+            .apply(
+                ParDo.of(
+                        new DoFn<Integer, Integer>() {
+                          @ProcessElement
+                          public void drop() {}
+                        })
+                    .withOutputTags(tag1, TupleTagList.of(tag2).and(tag3)));
+
+    outputs.get(tag1).setName("bizbazzle");
+    outputs.get(tag2).setName("gonzaggle");
+    outputs.get(tag3).setName("froonazzle");
+
+    runner.replaceTransforms(pipeline);
+
+    Job job = translator.translate(pipeline,
+        runner,
+        Collections.<DataflowPackage>emptyList()).getJob();
+
+    // The ParDo step
+    Step step = job.getSteps().get(1);
+    String stepName = Structs.getString(step.getProperties(), PropertyNames.USER_NAME);
+
+    List<Map<String, Object>> outputInfos =
+        Structs.getListOfMaps(step.getProperties(), PropertyNames.OUTPUT_INFO, null);
+
+    assertThat(outputInfos.size(), equalTo(3));
+
+    // The names set by the user _and_ the tags _must_ be ignored, or metrics will not show up.
+    for (int i = 0; i < outputInfos.size(); ++i) {
+      assertThat(
+          Structs.getString(outputInfos.get(i), PropertyNames.USER_NAME),
+          equalTo(String.format("%s.out%s", stepName, i)));
+    }
+  }
+
+  /**
    * Smoke test to fail fast if translation of a stateful ParDo
    * in batch breaks.
    */