You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/11/24 00:03:01 UTC

[01/11] incubator-beam git commit: Reject stateful DoFn in FlinkRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 9b9d016c8 -> 3dbeb8edf


Reject stateful DoFn in FlinkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/413a4024
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/413a4024
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/413a4024

Branch: refs/heads/python-sdk
Commit: 413a40243a30e059476395a2dcbfc98a94bb22f2
Parents: 4dd1978
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:33:28 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:01:31 2016 -0800

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    |  1 +
 .../FlinkBatchTransformTranslators.java         | 34 +++++++++++++++++---
 .../FlinkStreamingTransformTranslators.java     | 25 +++++++++++++-
 3 files changed, 55 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/413a4024/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index adcb3de..c060c25 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -53,6 +53,7 @@
                 </goals>
                 <configuration>
                   <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+                  <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>
                   <dependenciesToScan>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/413a4024/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 935a9ac..474d4e3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
@@ -46,6 +47,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
@@ -54,6 +56,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -487,11 +490,23 @@ class FlinkBatchTransformTranslators {
     @Override
     public void translateNode(
         ParDo.Bound<InputT, OutputT> transform,
+
         FlinkBatchTranslationContext context) {
+      DoFn<InputT, OutputT> doFn = transform.getNewFn();
+      if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+                DoFn.StateId.class.getSimpleName(),
+                doFn.getClass().getName(),
+                DoFn.class.getSimpleName(),
+                FlinkRunner.class.getSimpleName()));
+      }
+
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      final OldDoFn<InputT, OutputT> doFn = transform.getFn();
+      final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
 
       TypeInformation<WindowedValue<OutputT>> typeInformation =
           context.getTypeInfo(context.getOutput(transform));
@@ -507,7 +522,7 @@ class FlinkBatchTransformTranslators {
 
       FlinkDoFnFunction<InputT, OutputT> doFnWrapper =
           new FlinkDoFnFunction<>(
-              doFn,
+              oldDoFn,
               context.getOutput(transform).getWindowingStrategy(),
               sideInputStrategies,
               context.getPipelineOptions());
@@ -533,10 +548,21 @@ class FlinkBatchTransformTranslators {
     public void translateNode(
         ParDo.BoundMulti<InputT, OutputT> transform,
         FlinkBatchTranslationContext context) {
+      DoFn<InputT, OutputT> doFn = transform.getNewFn();
+      if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+                DoFn.StateId.class.getSimpleName(),
+                doFn.getClass().getName(),
+                DoFn.class.getSimpleName(),
+                FlinkRunner.class.getSimpleName()));
+      }
+
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      final OldDoFn<InputT, OutputT> doFn = transform.getFn();
+      final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
 
       Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
 
@@ -584,7 +610,7 @@ class FlinkBatchTransformTranslators {
       @SuppressWarnings("unchecked")
       FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper =
           new FlinkMultiOutputDoFnFunction(
-              doFn,
+              oldDoFn,
               windowingStrategy,
               sideInputStrategies,
               context.getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/413a4024/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 687e9c8..40dfbb9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.flink.translation;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -51,6 +50,7 @@ import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
@@ -58,6 +58,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -311,6 +312,17 @@ public class FlinkStreamingTransformTranslators {
         ParDo.Bound<InputT, OutputT> transform,
         FlinkStreamingTranslationContext context) {
 
+      DoFn<InputT, OutputT> doFn = transform.getNewFn();
+      if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+                DoFn.StateId.class.getSimpleName(),
+                doFn.getClass().getName(),
+                DoFn.class.getSimpleName(),
+                FlinkRunner.class.getSimpleName()));
+      }
+
       WindowingStrategy<?, ?> windowingStrategy =
           context.getOutput(transform).getWindowingStrategy();
 
@@ -460,6 +472,17 @@ public class FlinkStreamingTransformTranslators {
         ParDo.BoundMulti<InputT, OutputT> transform,
         FlinkStreamingTranslationContext context) {
 
+      DoFn<InputT, OutputT> doFn = transform.getNewFn();
+      if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+                DoFn.StateId.class.getSimpleName(),
+                doFn.getClass().getName(),
+                DoFn.class.getSimpleName(),
+                FlinkRunner.class.getSimpleName()));
+      }
+
       // we assume that the transformation does not change the windowing strategy.
       WindowingStrategy<?, ?> windowingStrategy =
           context.getInput(transform).getWindowingStrategy();


[05/11] incubator-beam git commit: Simplify the API for managing MetricsEnvironment

Posted by da...@apache.org.
Simplify the API for managing MetricsEnvironment

1. setCurrentContainer returns the previous MetricsEnvironment
2. setCurrentContainer(null) resets the thread local
3. scopedCurrentContainer sets the container and returns a Closeable to
   reset the previous container.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6fa8f658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6fa8f658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6fa8f658

Branch: refs/heads/python-sdk
Commit: 6fa8f658abaac1d3a983bfc3b8c09422159af8aa
Parents: 796ba7a
Author: bchambers <bc...@google.com>
Authored: Tue Nov 22 11:37:23 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/TransformExecutor.java  |  5 +-
 .../beam/sdk/metrics/MetricsEnvironment.java    | 60 +++++++++++++++-----
 .../sdk/metrics/MetricsEnvironmentTest.java     |  8 +--
 .../apache/beam/sdk/metrics/MetricsTest.java    |  6 +-
 4 files changed, 56 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8f658/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 1704955..fb31cc9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
@@ -89,8 +90,7 @@ class TransformExecutor<T> implements Runnable {
   @Override
   public void run() {
     MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName());
-    MetricsEnvironment.setMetricsContainer(metricsContainer);
-    try {
+    try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
       Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
       for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
         ModelEnforcement<T> enforcement = enforcementFactory.forBundle(inputBundle, transform);
@@ -117,7 +117,6 @@ class TransformExecutor<T> implements Runnable {
       // Report the physical metrics from the end of this step.
       context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative());
 
-      MetricsEnvironment.unsetMetricsContainer();
       transformEvaluationState.complete(this);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8f658/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index ef2660a8..7c06cbf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.metrics;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.slf4j.Logger;
@@ -29,11 +31,13 @@ import org.slf4j.LoggerFactory;
  * returned objects to create and modify metrics.
  *
  * <p>The runner should create {@link MetricsContainer} for each context in which metrics are
- * reported (by step and name) and call {@link #setMetricsContainer} before invoking any code that
- * may update metrics within that step.
+ * reported (by step and name) and call {@link #setCurrentContainer} before invoking any code that
+ * may update metrics within that step. It should call {@link #setCurrentContainer} again to restore
+ * the previous container.
  *
- * <p>The runner should call {@link #unsetMetricsContainer} (or {@link #setMetricsContainer} back to
- * the previous value) when exiting code that set the metrics container.
+ * <p>Alternatively, the runner can use {@link #scopedMetricsContainer(MetricsContainer)} to set the
+ * container for the current thread and get a {@link Closeable} that will restore the previous
+ * container when closed.
  */
 public class MetricsEnvironment {
 
@@ -45,15 +49,20 @@ public class MetricsEnvironment {
   private static final ThreadLocal<MetricsContainer> CONTAINER_FOR_THREAD =
       new ThreadLocal<MetricsContainer>();
 
-  /** Set the {@link MetricsContainer} for the current thread. */
-  public static void setMetricsContainer(MetricsContainer container) {
-    CONTAINER_FOR_THREAD.set(container);
-  }
-
-
-  /** Clear the {@link MetricsContainer} for the current thread. */
-  public static void unsetMetricsContainer() {
-    CONTAINER_FOR_THREAD.remove();
+  /**
+   * Set the {@link MetricsContainer} for the current thread.
+   *
+   * @return The previous container for the current thread.
+   */
+  @Nullable
+  public static MetricsContainer setCurrentContainer(@Nullable MetricsContainer container) {
+    MetricsContainer previous = getCurrentContainer();
+    if (container == null) {
+      CONTAINER_FOR_THREAD.remove();
+    } else {
+      CONTAINER_FOR_THREAD.set(container);
+    }
+    return previous;
   }
 
   /** Called by the run to indicate whether metrics reporting is supported. */
@@ -62,6 +71,31 @@ public class MetricsEnvironment {
   }
 
   /**
+   * Set the {@link MetricsContainer} for the current thread.
+   *
+   * @return A {@link Closeable} that will reset the current container to the previous
+   * {@link MetricsContainer} when closed.
+   */
+  public static Closeable scopedMetricsContainer(MetricsContainer container) {
+    return new ScopedContainer(container);
+  }
+
+  private static class ScopedContainer implements Closeable {
+
+    @Nullable
+    private final MetricsContainer oldContainer;
+
+    private ScopedContainer(MetricsContainer newContainer) {
+      this.oldContainer = setCurrentContainer(newContainer);
+    }
+
+    @Override
+    public void close() throws IOException {
+      setCurrentContainer(oldContainer);
+    }
+  }
+
+  /**
    * Return the {@link MetricsContainer} for the current thread.
    *
    * <p>May return null if metrics are not supported by the current runner or if the current thread

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8f658/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
index 4200a20..0ce17b4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
@@ -35,7 +35,7 @@ import org.junit.runners.JUnit4;
 public class MetricsEnvironmentTest {
   @After
   public void teardown() {
-    MetricsEnvironment.unsetMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(null);
   }
 
   @Test
@@ -44,11 +44,11 @@ public class MetricsEnvironmentTest {
     MetricsContainer c1 = new MetricsContainer("step1");
     MetricsContainer c2 = new MetricsContainer("step2");
 
-    MetricsEnvironment.setMetricsContainer(c1);
+    MetricsEnvironment.setCurrentContainer(c1);
     counter.inc();
-    MetricsEnvironment.setMetricsContainer(c2);
+    MetricsEnvironment.setCurrentContainer(c2);
     counter.dec();
-    MetricsEnvironment.unsetMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(null);
 
     MetricUpdates updates1 = c1.getUpdates();
     MetricUpdates updates2 = c2.getUpdates();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8f658/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index d11b44d..732cb34 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -37,7 +37,7 @@ public class MetricsTest {
 
   @After
   public void tearDown() {
-    MetricsEnvironment.unsetMetricsContainer();
+    MetricsEnvironment.setCurrentContainer(null);
   }
 
   @Test
@@ -61,7 +61,7 @@ public class MetricsTest {
   @Test
   public void distributionToCell() {
     MetricsContainer container = new MetricsContainer("step");
-    MetricsEnvironment.setMetricsContainer(container);
+    MetricsEnvironment.setCurrentContainer(container);
 
     Distribution distribution = Metrics.distribution(NS, NAME);
 
@@ -80,7 +80,7 @@ public class MetricsTest {
   @Test
   public void counterToCell() {
     MetricsContainer container = new MetricsContainer("step");
-    MetricsEnvironment.setMetricsContainer(container);
+    MetricsEnvironment.setCurrentContainer(container);
     Counter counter = Metrics.counter(NS, NAME);
     CounterCell cell = container.getCounter(METRIC_NAME);
     counter.inc();


[10/11] incubator-beam git commit: Update transitive dependencies for Apex 3.5.0 snapshot version.

Posted by da...@apache.org.
Update transitive dependencies for Apex 3.5.0 snapshot version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2e03bb8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2e03bb8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2e03bb8a

Branch: refs/heads/python-sdk
Commit: 2e03bb8a136078064014a0a7101960f6d2019487
Parents: 6f86af6
Author: Thomas Weise <th...@apache.org>
Authored: Tue Nov 22 11:38:00 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:05 2016 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e03bb8a/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index d0b0fdf..84185b8 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -216,11 +216,11 @@
               <ignoredUsedUndeclaredDependencies>
                 <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.5.0-SNAPSHOT</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency>
-                <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:2.1</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::2.24.0</ignoredUsedUndeclaredDependency>
-                <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.2.1</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>
-                <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.2.0</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:19.0</ignoredUsedUndeclaredDependency>
               </ignoredUsedUndeclaredDependencies>


[07/11] incubator-beam git commit: Output Keyed Bundles in GroupAlsoByWindowEvaluator

Posted by da...@apache.org.
Output Keyed Bundles in GroupAlsoByWindowEvaluator

This allows reuse of keys for downstream serialization.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f03b4fe1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f03b4fe1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f03b4fe1

Branch: refs/heads/python-sdk
Commit: f03b4fe11cb605edf216903738a6c305b3a91066
Parents: 6fa8f65
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 22 14:51:39 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/direct/DirectRunner.java  | 5 ++++-
 .../beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java  | 4 +++-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f03b4fe1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 0060e84..cb31947 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -310,7 +311,9 @@ public class DirectRunner
     KeyedPValueTrackingVisitor keyedPValueVisitor =
         KeyedPValueTrackingVisitor.create(
             ImmutableSet.<Class<? extends PTransform>>of(
-                GroupByKey.class, DirectGroupByKeyOnly.class));
+                GBKIntoKeyedWorkItems.class,
+                DirectGroupByKeyOnly.class,
+                DirectGroupAlsoByWindow.class));
     pipeline.traverseTopologically(keyedPValueVisitor);
 
     DisplayDataValidator.validatePipeline(pipeline);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f03b4fe1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index b946e4d..36c742b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -112,6 +112,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
     private @SuppressWarnings("unchecked") final WindowingStrategy<?, BoundedWindow>
         windowingStrategy;
 
+    private final StructuralKey<?> structuralKey;
     private final Collection<UncommittedBundle<?>> outputBundles;
     private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> unprocessedElements;
     private final AggregatorContainer.Mutator aggregatorChanges;
@@ -130,6 +131,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       this.evaluationContext = evaluationContext;
       this.application = application;
 
+      structuralKey = inputBundle.getKey();
       stepContext = evaluationContext
           .getExecutionContext(application, inputBundle.getKey())
           .getOrCreateStepContext(
@@ -159,7 +161,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       K key = workItem.key();
 
       UncommittedBundle<KV<K, Iterable<V>>> bundle =
-          evaluationContext.createBundle(application.getOutput());
+          evaluationContext.createKeyedBundle(structuralKey, application.getOutput());
       outputBundles.add(bundle);
       CopyOnAccessInMemoryStateInternals<K> stateInternals =
           (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();


[11/11] incubator-beam git commit: This closes #1432

Posted by da...@apache.org.
This closes #1432


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3dbeb8ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3dbeb8ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3dbeb8ed

Branch: refs/heads/python-sdk
Commit: 3dbeb8edfdfe4c9e8987e4d8df4451fdb748dc07
Parents: 9b9d016 2e03bb8
Author: Davor Bonaci <da...@google.com>
Authored: Wed Nov 23 16:02:41 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:41 2016 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml                            |   7 +-
 .../translation/ParDoBoundMultiTranslator.java  |  67 +++++----
 .../apex/translation/ParDoBoundTranslator.java  |  46 +++---
 .../beam/runners/direct/DirectRunner.java       |   5 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   4 +-
 .../beam/runners/direct/TransformExecutor.java  |   5 +-
 runners/flink/runner/pom.xml                    |   1 +
 .../FlinkBatchTransformTranslators.java         |  34 ++++-
 .../FlinkStreamingTransformTranslators.java     |  25 +++-
 runners/spark/pom.xml                           |   1 +
 .../spark/translation/TransformTranslator.java  |  23 +++
 .../beam/sdk/metrics/MetricsEnvironment.java    |  60 ++++++--
 .../beam/sdk/testing/UsesStatefulParDo.java     |  25 ++++
 .../beam/sdk/util/common/ReflectHelpers.java    |   3 +-
 .../sdk/metrics/MetricsEnvironmentTest.java     |   8 +-
 .../apache/beam/sdk/metrics/MetricsTest.java    |   6 +-
 .../sdk/runners/TransformHierarchyTest.java     | 142 +++++++++++++++++++
 .../src/main/java/StarterPipeline.java          |  18 +--
 .../src/main/java/it/pkg/StarterPipeline.java   |  18 +--
 19 files changed, 407 insertions(+), 91 deletions(-)
----------------------------------------------------------------------



[04/11] incubator-beam git commit: Update StarterPipeline

Posted by da...@apache.org.
Update StarterPipeline

Convert StarterPipeline ParDo to MapElements.

Use the new DoFn for non-outputting transforms.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1fc8d65a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1fc8d65a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1fc8d65a

Branch: refs/heads/python-sdk
Commit: 1fc8d65a079e58d740a9b954da980963f20e9edf
Parents: 9b9d016
Author: Scott Wegner <sw...@google.com>
Authored: Mon Nov 21 16:33:07 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:01:31 2016 -0800

----------------------------------------------------------------------
 .../src/main/java/StarterPipeline.java            | 18 ++++++++++--------
 .../src/main/java/it/pkg/StarterPipeline.java     | 18 ++++++++++--------
 2 files changed, 20 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fc8d65a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
index 0b21aa6..d6afdec 100644
--- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
+++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
@@ -20,13 +20,15 @@ package ${package};
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A starter example for writing Google Cloud Dataflow programs.
+ * A starter example for writing Beam programs.
  *
  * <p>The example takes two strings, converts them to their upper-case
  * representation and logs them.
@@ -39,7 +41,7 @@ import org.slf4j.LoggerFactory;
  * Platform, you should specify the following command-line options:
  *   --project=<YOUR_PROJECT_ID>
  *   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
- *   --runner=BlockingDataflowRunner
+ *   --runner=DataflowRunner
  */
 public class StarterPipeline {
   private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
@@ -49,14 +51,14 @@ public class StarterPipeline {
         PipelineOptionsFactory.fromArgs(args).withValidation().create());
 
     p.apply(Create.of("Hello", "World"))
-    .apply(ParDo.of(new OldDoFn<String, String>() {
+    .apply(MapElements.via(new SimpleFunction<String, String>() {
       @Override
-      public void processElement(ProcessContext c) {
-        c.output(c.element().toUpperCase());
+      public String apply(String input) {
+        return input.toUpperCase();
       }
     }))
-    .apply(ParDo.of(new OldDoFn<String, Void>() {
-      @Override
+    .apply(ParDo.of(new DoFn<String, Void>() {
+      @ProcessElement
       public void processElement(ProcessContext c)  {
         LOG.info(c.element());
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fc8d65a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
index b332442..4ae92e8 100644
--- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
+++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
@@ -20,13 +20,15 @@ package it.pkg;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A starter example for writing Google Cloud Dataflow programs.
+ * A starter example for writing Beam programs.
  *
  * <p>The example takes two strings, converts them to their upper-case
  * representation and logs them.
@@ -39,7 +41,7 @@ import org.slf4j.LoggerFactory;
  * Platform, you should specify the following command-line options:
  *   --project=<YOUR_PROJECT_ID>
  *   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
- *   --runner=BlockingDataflowRunner
+ *   --runner=DataflowRunner
  */
 public class StarterPipeline {
   private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
@@ -49,14 +51,14 @@ public class StarterPipeline {
         PipelineOptionsFactory.fromArgs(args).withValidation().create());
 
     p.apply(Create.of("Hello", "World"))
-    .apply(ParDo.of(new OldDoFn<String, String>() {
+    .apply(MapElements.via(new SimpleFunction<String, String>() {
       @Override
-      public void processElement(ProcessContext c) {
-        c.output(c.element().toUpperCase());
+      public String apply(String input) {
+        return input.toUpperCase();
       }
     }))
-    .apply(ParDo.of(new OldDoFn<String, Void>() {
-      @Override
+    .apply(ParDo.of(new DoFn<String, Void>() {
+      @ProcessElement
       public void processElement(ProcessContext c)  {
         LOG.info(c.element());
       }


[06/11] incubator-beam git commit: Reject stateful DoFn in ApexRunner

Posted by da...@apache.org.
Reject stateful DoFn in ApexRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/796ba7ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/796ba7ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/796ba7ab

Branch: refs/heads/python-sdk
Commit: 796ba7ab75bc8d01a3a59efc29cdc17bcd26af4a
Parents: 413a402
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:33:01 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  1 +
 .../translation/ParDoBoundMultiTranslator.java  | 67 +++++++++++++-------
 .../apex/translation/ParDoBoundTranslator.java  | 46 +++++++++-----
 3 files changed, 74 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/796ba7ab/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 5478b24..d0b0fdf 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -185,6 +185,7 @@
             </goals>
             <configuration>
               <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+              <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
               <parallel>none</parallel>
               <failIfNoTests>true</failIfNoTests>
               <dependenciesToScan>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/796ba7ab/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index 7c91b91..fed5f4b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -23,17 +23,17 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.OutputPort;
 import com.google.common.collect.Maps;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
@@ -53,20 +53,35 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
 
   @Override
   public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
-    OldDoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFn<InputT, OutputT> doFn = transform.getNewFn();
+    if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              ApexRunner.class.getSimpleName()));
+    }
+    OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
     PCollectionTuple output = context.getOutput();
     PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Coder<InputT> inputCoder = input.getCoder();
-    WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
-        input.getWindowingStrategy().getWindowFn().windowCoder());
+    WindowedValueCoder<InputT> wvInputCoder =
+        FullWindowedValueCoder.of(
+            inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
 
-    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
-        context.getPipelineOptions(),
-        doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
-        context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder,
-        context.<Void>stateInternalsFactory()
-        );
+    ApexParDoOperator<InputT, OutputT> operator =
+        new ApexParDoOperator<>(
+            context.getPipelineOptions(),
+            oldDoFn,
+            transform.getMainOutputTag(),
+            transform.getSideOutputTags().getAll(),
+            context.<PCollection<?>>getInput().getWindowingStrategy(),
+            sideInputs,
+            wvInputCoder,
+            context.<Void>stateInternalsFactory());
 
     Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
     Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
@@ -91,7 +106,9 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
     }
   }
 
-  static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs,
+  static void addSideInputs(
+      ApexParDoOperator<?, ?> operator,
+      List<PCollectionView<?>> sideInputs,
       TranslationContext context) {
     Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
     if (sideInputs.size() > sideInputPorts.length) {
@@ -105,8 +122,8 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
     }
   }
 
-  private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs,
-      TranslationContext context) {
+  private static PCollection<?> unionSideInputs(
+      List<PCollectionView<?>> sideInputs, TranslationContext context) {
     checkArgument(sideInputs.size() > 1, "requires multiple side inputs");
     // flatten and assign union tag
     List<PCollection<Object>> sourceCollections = new ArrayList<>();
@@ -115,13 +132,16 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
     for (int i = 0; i < sideInputs.size(); i++) {
       PCollectionView<?> sideInput = sideInputs.get(i);
       PCollection<?> sideInputCollection = context.getViewInput(sideInput);
-      if (!sideInputCollection.getWindowingStrategy().equals(
-          firstSideInput.getWindowingStrategy())) {
+      if (!sideInputCollection
+          .getWindowingStrategy()
+          .equals(firstSideInput.getWindowingStrategy())) {
         // TODO: check how to handle this in stream codec
         //String msg = "Multiple side inputs with different window strategies.";
         //throw new UnsupportedOperationException(msg);
-        LOG.warn("Side inputs union with different windowing strategies {} {}",
-            firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy());
+        LOG.warn(
+            "Side inputs union with different windowing strategies {} {}",
+            firstSideInput.getWindowingStrategy(),
+            sideInputCollection.getWindowingStrategy());
       }
       if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
         String msg = "Multiple side inputs with different coders.";
@@ -131,12 +151,11 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
       unionTags.put(sideInputCollection, i);
     }
 
-    PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(
-        firstSideInput, firstSideInput.getCoder());
-    FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection,
-        context);
+    PCollection<Object> resultCollection =
+        FlattenPCollectionTranslator.intermediateCollection(
+            firstSideInput, firstSideInput.getCoder());
+    FlattenPCollectionTranslator.flattenCollections(
+        sourceCollections, unionTags, resultCollection, context);
     return resultCollection;
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/796ba7ab/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index c1ebbd5..7a918a7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -19,12 +19,13 @@
 package org.apache.beam.runners.apex.translation;
 
 import java.util.List;
-
+import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
@@ -32,33 +33,46 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
-/**
- * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}.
- */
-class ParDoBoundTranslator<InputT, OutputT> implements
-    TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+/** {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}. */
+class ParDoBoundTranslator<InputT, OutputT>
+    implements TransformTranslator<ParDo.Bound<InputT, OutputT>> {
   private static final long serialVersionUID = 1L;
 
   @Override
   public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
-    OldDoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFn<InputT, OutputT> doFn = transform.getNewFn();
+    if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              ApexRunner.class.getSimpleName()));
+    }
+    OldDoFn<InputT, OutputT> oldDoFn = transform.getOldFn();
     PCollection<OutputT> output = context.getOutput();
     PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Coder<InputT> inputCoder = input.getCoder();
-    WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
-        input.getWindowingStrategy().getWindowFn().windowCoder());
+    WindowedValueCoder<InputT> wvInputCoder =
+        FullWindowedValueCoder.of(
+            inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
 
-    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
-        context.getPipelineOptions(),
-        doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
-        output.getWindowingStrategy(), sideInputs, wvInputCoder,
-        context.<Void>stateInternalsFactory()
-        );
+    ApexParDoOperator<InputT, OutputT> operator =
+        new ApexParDoOperator<>(
+            context.getPipelineOptions(),
+            oldDoFn,
+            new TupleTag<OutputT>(),
+            TupleTagList.empty().getAll() /*sideOutputTags*/,
+            output.getWindowingStrategy(),
+            sideInputs,
+            wvInputCoder,
+            context.<Void>stateInternalsFactory());
     context.addOperator(operator, operator.output);
     context.addStream(context.getInput(), operator.input);
     if (!sideInputs.isEmpty()) {
-       ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
+      ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
     }
   }
 }


[02/11] incubator-beam git commit: Reject stateful DoFn in SparkRunner

Posted by da...@apache.org.
Reject stateful DoFn in SparkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4dd19782
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4dd19782
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4dd19782

Branch: refs/heads/python-sdk
Commit: 4dd19782f2624bf8aed3df8484fa314f94904571
Parents: 255ad9a
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:33:13 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:01:31 2016 -0800

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  1 +
 .../spark/translation/TransformTranslator.java  | 23 ++++++++++++++++++++
 2 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4dd19782/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 4c5b3f5..88223e2 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -72,6 +72,7 @@
                 </goals>
                 <configuration>
                   <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+                  <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
                   <forkCount>1</forkCount>
                   <reuseForks>false</reuseForks>
                   <failIfNoTests>true</failIfNoTests>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4dd19782/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index c902ee3..60d668e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -31,6 +31,7 @@ import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.io.SourceRDD;
@@ -47,12 +48,14 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -225,6 +228,16 @@ public final class TransformTranslator {
     return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
       @Override
       public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
+        DoFn<InputT, OutputT> doFn = transform.getNewFn();
+        if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+          throw new UnsupportedOperationException(
+              String.format(
+                  "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+                  DoFn.StateId.class.getSimpleName(),
+                  doFn.getClass().getName(),
+                  DoFn.class.getSimpleName(),
+                  SparkRunner.class.getSimpleName()));
+        }
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<InputT>> inRDD =
             ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
@@ -247,6 +260,16 @@ public final class TransformTranslator {
     return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() {
       @Override
       public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
+        DoFn<InputT, OutputT> doFn = transform.getNewFn();
+        if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+          throw new UnsupportedOperationException(
+              String.format(
+                  "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+                  DoFn.StateId.class.getSimpleName(),
+                  doFn.getClass().getName(),
+                  DoFn.class.getSimpleName(),
+                  SparkRunner.class.getSimpleName()));
+        }
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<InputT>> inRDD =
             ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();


[03/11] incubator-beam git commit: Add JUnit category for stateful ParDo tests

Posted by da...@apache.org.
Add JUnit category for stateful ParDo tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/255ad9a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/255ad9a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/255ad9a3

Branch: refs/heads/python-sdk
Commit: 255ad9a327133ab4f05ebbceca236d5fe0006028
Parents: 1fc8d65
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Nov 21 15:41:13 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:01:31 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/testing/UsesStatefulParDo.java     | 25 ++++++++++++++++++++
 1 file changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/255ad9a3/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
new file mode 100644
index 0000000..8bd6330
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Category tag for validation tests which utilize stateful {@link ParDo}.
+ */
+public interface UsesStatefulParDo {}


[08/11] incubator-beam git commit: Add TransformHierarchyTest

Posted by da...@apache.org.
Add TransformHierarchyTest

This tests basic features of TransformHierarchy


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dcd401ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dcd401ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dcd401ba

Branch: refs/heads/python-sdk
Commit: dcd401ba0b5bd12343484b0df50b15b6ef10ace9
Parents: f03b4fe
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 22 16:14:29 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800

----------------------------------------------------------------------
 .../sdk/runners/TransformHierarchyTest.java     | 142 +++++++++++++++++++
 1 file changed, 142 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dcd401ba/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
new file mode 100644
index 0000000..c28f23e
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.runners;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link TransformHierarchy}.
+ */
+@RunWith(JUnit4.class)
+public class TransformHierarchyTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private TransformHierarchy hierarchy;
+  private TestPipeline pipeline;
+
+  @Before
+  public void setup() {
+    hierarchy = new TransformHierarchy();
+    pipeline = TestPipeline.create();
+  }
+
+  @Test
+  public void getCurrentNoPushReturnsRoot() {
+    assertThat(hierarchy.getCurrent().isRootNode(), is(true));
+  }
+
+  @Test
+  public void popWithoutPushThrows() {
+    thrown.expect(IllegalStateException.class);
+    hierarchy.popNode();
+  }
+
+  @Test
+  public void pushThenPopSucceeds() {
+    TransformTreeNode root = hierarchy.getCurrent();
+    TransformTreeNode node =
+        new TransformTreeNode(hierarchy.getCurrent(), Create.of(1), "Create", PBegin.in(pipeline));
+    hierarchy.pushNode(node);
+    assertThat(hierarchy.getCurrent(), equalTo(node));
+    hierarchy.popNode();
+    assertThat(hierarchy.getCurrent(), equalTo(root));
+  }
+
+  @Test
+  public void visitVisitsAllPushed() {
+    TransformTreeNode root = hierarchy.getCurrent();
+    Create.Values<Integer> create = Create.of(1);
+    PCollection<Integer> created = pipeline.apply(create);
+    PBegin begin = PBegin.in(pipeline);
+
+    TransformTreeNode compositeNode =
+        new TransformTreeNode(root, create, "Create", begin);
+    root.addComposite(compositeNode);
+    TransformTreeNode primitiveNode =
+        new TransformTreeNode(
+            compositeNode, Read.from(CountingSource.upTo(1L)), "Create/Read", begin);
+    compositeNode.addComposite(primitiveNode);
+
+    TransformTreeNode otherPrimitive =
+        new TransformTreeNode(
+            root, MapElements.via(new SimpleFunction<Integer, Integer>() {
+          @Override
+          public Integer apply(Integer input) {
+            return input;
+          }
+        }), "ParDo", created);
+    root.addComposite(otherPrimitive);
+    otherPrimitive.addInputProducer(created, primitiveNode);
+
+    hierarchy.pushNode(compositeNode);
+    hierarchy.pushNode(primitiveNode);
+    hierarchy.popNode();
+    hierarchy.popNode();
+    hierarchy.pushNode(otherPrimitive);
+    hierarchy.popNode();
+
+    final Set<TransformTreeNode> visitedCompositeNodes = new HashSet<>();
+    final Set<TransformTreeNode> visitedPrimitiveNodes = new HashSet<>();
+    final Set<PValue> visitedValuesInVisitor = new HashSet<>();
+
+    Set<PValue> visitedValues = new HashSet<>();
+    hierarchy.visit(new PipelineVisitor.Defaults() {
+      @Override
+      public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+        visitedCompositeNodes.add(node);
+        return CompositeBehavior.ENTER_TRANSFORM;
+      }
+
+      @Override
+      public void visitPrimitiveTransform(TransformTreeNode node) {
+        visitedPrimitiveNodes.add(node);
+      }
+
+      @Override
+      public void visitValue(PValue value, TransformTreeNode producer) {
+        visitedValuesInVisitor.add(value);
+      }
+    }, visitedValues);
+
+    assertThat(visitedCompositeNodes, containsInAnyOrder(root, compositeNode));
+    assertThat(visitedPrimitiveNodes, containsInAnyOrder(primitiveNode, otherPrimitive));
+    assertThat(visitedValuesInVisitor, Matchers.<PValue>containsInAnyOrder(created));
+  }
+}


[09/11] incubator-beam git commit: Use more natural class to find class loader in ReflectHelpers

Posted by da...@apache.org.
Use more natural class to find class loader in ReflectHelpers


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6f86af61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6f86af61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6f86af61

Branch: refs/heads/python-sdk
Commit: 6f86af612f97ad57cf4ba2cae21ba232f7494ada
Parents: dcd401b
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 22 22:16:29 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f86af61/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
index 637e8e3..4ec39c1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
@@ -39,7 +39,6 @@ import java.util.Queue;
 import java.util.ServiceLoader;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.IOChannelUtils;
 
 /**
  * Utilities for working with with {@link Class Classes} and {@link Method Methods}.
@@ -225,7 +224,7 @@ public class ReflectHelpers {
   public static ClassLoader findClassLoader() {
     ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
     if (classLoader == null) {
-      classLoader = IOChannelUtils.class.getClassLoader();
+      classLoader = ReflectHelpers.class.getClassLoader();
     }
     if (classLoader == null) {
       classLoader = ClassLoader.getSystemClassLoader();