You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/11/24 00:03:01 UTC
[01/11] incubator-beam git commit: Reject stateful DoFn in FlinkRunner
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 9b9d016c8 -> 3dbeb8edf
Reject stateful DoFn in FlinkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/413a4024
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/413a4024
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/413a4024
Branch: refs/heads/python-sdk
Commit: 413a40243a30e059476395a2dcbfc98a94bb22f2
Parents: 4dd1978
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:33:28 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:01:31 2016 -0800
----------------------------------------------------------------------
runners/flink/runner/pom.xml | 1 +
.../FlinkBatchTransformTranslators.java | 34 +++++++++++++++++---
.../FlinkStreamingTransformTranslators.java | 25 +++++++++++++-
3 files changed, 55 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/413a4024/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index adcb3de..c060c25 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -53,6 +53,7 @@
</goals>
<configuration>
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+ <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
<dependenciesToScan>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/413a4024/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 935a9ac..474d4e3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
@@ -46,6 +47,7 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
@@ -54,6 +56,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -487,11 +490,23 @@ class FlinkBatchTransformTranslators {
@Override
public void translateNode(
ParDo.Bound<InputT, OutputT> transform,
+
FlinkBatchTranslationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ FlinkRunner.class.getSimpleName()));
+ }
+
DataSet<WindowedValue<InputT>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
- final OldDoFn<InputT, OutputT> doFn = transform.getFn();
+ final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
TypeInformation<WindowedValue<OutputT>> typeInformation =
context.getTypeInfo(context.getOutput(transform));
@@ -507,7 +522,7 @@ class FlinkBatchTransformTranslators {
FlinkDoFnFunction<InputT, OutputT> doFnWrapper =
new FlinkDoFnFunction<>(
- doFn,
+ oldDoFn,
context.getOutput(transform).getWindowingStrategy(),
sideInputStrategies,
context.getPipelineOptions());
@@ -533,10 +548,21 @@ class FlinkBatchTransformTranslators {
public void translateNode(
ParDo.BoundMulti<InputT, OutputT> transform,
FlinkBatchTranslationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ FlinkRunner.class.getSimpleName()));
+ }
+
DataSet<WindowedValue<InputT>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
- final OldDoFn<InputT, OutputT> doFn = transform.getFn();
+ final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
@@ -584,7 +610,7 @@ class FlinkBatchTransformTranslators {
@SuppressWarnings("unchecked")
FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper =
new FlinkMultiOutputDoFnFunction(
- doFn,
+ oldDoFn,
windowingStrategy,
sideInputStrategies,
context.getPipelineOptions(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/413a4024/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 687e9c8..40dfbb9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.flink.translation;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -51,6 +50,7 @@ import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
@@ -58,6 +58,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -311,6 +312,17 @@ public class FlinkStreamingTransformTranslators {
ParDo.Bound<InputT, OutputT> transform,
FlinkStreamingTranslationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ FlinkRunner.class.getSimpleName()));
+ }
+
WindowingStrategy<?, ?> windowingStrategy =
context.getOutput(transform).getWindowingStrategy();
@@ -460,6 +472,17 @@ public class FlinkStreamingTransformTranslators {
ParDo.BoundMulti<InputT, OutputT> transform,
FlinkStreamingTranslationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ FlinkRunner.class.getSimpleName()));
+ }
+
// we assume that the transformation does not change the windowing strategy.
WindowingStrategy<?, ?> windowingStrategy =
context.getInput(transform).getWindowingStrategy();
[05/11] incubator-beam git commit: Simplify the API for managing
MetricsEnvironment
Posted by da...@apache.org.
Simplify the API for managing MetricsEnvironment
1. setCurrentContainer returns the previous MetricsEnvironment
2. setCurrentContainer(null) resets the thread local
3. scopedCurrentContainer sets the container and returns a Closeable to
reset the previous container.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6fa8f658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6fa8f658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6fa8f658
Branch: refs/heads/python-sdk
Commit: 6fa8f658abaac1d3a983bfc3b8c09422159af8aa
Parents: 796ba7a
Author: bchambers <bc...@google.com>
Authored: Tue Nov 22 11:37:23 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/TransformExecutor.java | 5 +-
.../beam/sdk/metrics/MetricsEnvironment.java | 60 +++++++++++++++-----
.../sdk/metrics/MetricsEnvironmentTest.java | 8 +--
.../apache/beam/sdk/metrics/MetricsTest.java | 6 +-
4 files changed, 56 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8f658/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 1704955..fb31cc9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
@@ -89,8 +90,7 @@ class TransformExecutor<T> implements Runnable {
@Override
public void run() {
MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName());
- MetricsEnvironment.setMetricsContainer(metricsContainer);
- try {
+ try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
ModelEnforcement<T> enforcement = enforcementFactory.forBundle(inputBundle, transform);
@@ -117,7 +117,6 @@ class TransformExecutor<T> implements Runnable {
// Report the physical metrics from the end of this step.
context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative());
- MetricsEnvironment.unsetMetricsContainer();
transformEvaluationState.complete(this);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8f658/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index ef2660a8..7c06cbf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.metrics;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.slf4j.Logger;
@@ -29,11 +31,13 @@ import org.slf4j.LoggerFactory;
* returned objects to create and modify metrics.
*
* <p>The runner should create {@link MetricsContainer} for each context in which metrics are
- * reported (by step and name) and call {@link #setMetricsContainer} before invoking any code that
- * may update metrics within that step.
+ * reported (by step and name) and call {@link #setCurrentContainer} before invoking any code that
+ * may update metrics within that step. It should call {@link #setCurrentContainer} again to restore
+ * the previous container.
*
- * <p>The runner should call {@link #unsetMetricsContainer} (or {@link #setMetricsContainer} back to
- * the previous value) when exiting code that set the metrics container.
+ * <p>Alternatively, the runner can use {@link #scopedMetricsContainer(MetricsContainer)} to set the
+ * container for the current thread and get a {@link Closeable} that will restore the previous
+ * container when closed.
*/
public class MetricsEnvironment {
@@ -45,15 +49,20 @@ public class MetricsEnvironment {
private static final ThreadLocal<MetricsContainer> CONTAINER_FOR_THREAD =
new ThreadLocal<MetricsContainer>();
- /** Set the {@link MetricsContainer} for the current thread. */
- public static void setMetricsContainer(MetricsContainer container) {
- CONTAINER_FOR_THREAD.set(container);
- }
-
-
- /** Clear the {@link MetricsContainer} for the current thread. */
- public static void unsetMetricsContainer() {
- CONTAINER_FOR_THREAD.remove();
+ /**
+ * Set the {@link MetricsContainer} for the current thread.
+ *
+ * @return The previous container for the current thread.
+ */
+ @Nullable
+ public static MetricsContainer setCurrentContainer(@Nullable MetricsContainer container) {
+ MetricsContainer previous = getCurrentContainer();
+ if (container == null) {
+ CONTAINER_FOR_THREAD.remove();
+ } else {
+ CONTAINER_FOR_THREAD.set(container);
+ }
+ return previous;
}
/** Called by the run to indicate whether metrics reporting is supported. */
@@ -62,6 +71,31 @@ public class MetricsEnvironment {
}
/**
+ * Set the {@link MetricsContainer} for the current thread.
+ *
+ * @return A {@link Closeable} that will reset the current container to the previous
+ * {@link MetricsContainer} when closed.
+ */
+ public static Closeable scopedMetricsContainer(MetricsContainer container) {
+ return new ScopedContainer(container);
+ }
+
+ private static class ScopedContainer implements Closeable {
+
+ @Nullable
+ private final MetricsContainer oldContainer;
+
+ private ScopedContainer(MetricsContainer newContainer) {
+ this.oldContainer = setCurrentContainer(newContainer);
+ }
+
+ @Override
+ public void close() throws IOException {
+ setCurrentContainer(oldContainer);
+ }
+ }
+
+ /**
* Return the {@link MetricsContainer} for the current thread.
*
* <p>May return null if metrics are not supported by the current runner or if the current thread
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8f658/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
index 4200a20..0ce17b4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
@@ -35,7 +35,7 @@ import org.junit.runners.JUnit4;
public class MetricsEnvironmentTest {
@After
public void teardown() {
- MetricsEnvironment.unsetMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(null);
}
@Test
@@ -44,11 +44,11 @@ public class MetricsEnvironmentTest {
MetricsContainer c1 = new MetricsContainer("step1");
MetricsContainer c2 = new MetricsContainer("step2");
- MetricsEnvironment.setMetricsContainer(c1);
+ MetricsEnvironment.setCurrentContainer(c1);
counter.inc();
- MetricsEnvironment.setMetricsContainer(c2);
+ MetricsEnvironment.setCurrentContainer(c2);
counter.dec();
- MetricsEnvironment.unsetMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(null);
MetricUpdates updates1 = c1.getUpdates();
MetricUpdates updates2 = c2.getUpdates();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8f658/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index d11b44d..732cb34 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -37,7 +37,7 @@ public class MetricsTest {
@After
public void tearDown() {
- MetricsEnvironment.unsetMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(null);
}
@Test
@@ -61,7 +61,7 @@ public class MetricsTest {
@Test
public void distributionToCell() {
MetricsContainer container = new MetricsContainer("step");
- MetricsEnvironment.setMetricsContainer(container);
+ MetricsEnvironment.setCurrentContainer(container);
Distribution distribution = Metrics.distribution(NS, NAME);
@@ -80,7 +80,7 @@ public class MetricsTest {
@Test
public void counterToCell() {
MetricsContainer container = new MetricsContainer("step");
- MetricsEnvironment.setMetricsContainer(container);
+ MetricsEnvironment.setCurrentContainer(container);
Counter counter = Metrics.counter(NS, NAME);
CounterCell cell = container.getCounter(METRIC_NAME);
counter.inc();
[10/11] incubator-beam git commit: Update transitive dependencies for
Apex 3.5.0 snapshot version.
Posted by da...@apache.org.
Update transitive dependencies for Apex 3.5.0 snapshot version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2e03bb8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2e03bb8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2e03bb8a
Branch: refs/heads/python-sdk
Commit: 2e03bb8a136078064014a0a7101960f6d2019487
Parents: 6f86af6
Author: Thomas Weise <th...@apache.org>
Authored: Tue Nov 22 11:38:00 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:05 2016 -0800
----------------------------------------------------------------------
runners/apex/pom.xml | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e03bb8a/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index d0b0fdf..84185b8 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -216,11 +216,11 @@
<ignoredUsedUndeclaredDependencies>
<ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.5.0-SNAPSHOT</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency>
- <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:2.1</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::2.24.0</ignoredUsedUndeclaredDependency>
- <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.2.1</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>
- <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.2.0</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:19.0</ignoredUsedUndeclaredDependency>
</ignoredUsedUndeclaredDependencies>
[07/11] incubator-beam git commit: Output Keyed Bundles in
GroupAlsoByWindowEvaluator
Posted by da...@apache.org.
Output Keyed Bundles in GroupAlsoByWindowEvaluator
This allows reuse of keys for downstream serialization.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f03b4fe1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f03b4fe1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f03b4fe1
Branch: refs/heads/python-sdk
Commit: f03b4fe11cb605edf216903738a6c305b3a91066
Parents: 6fa8f65
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 22 14:51:39 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800
----------------------------------------------------------------------
.../main/java/org/apache/beam/runners/direct/DirectRunner.java | 5 ++++-
.../beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java | 4 +++-
2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f03b4fe1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 0060e84..cb31947 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -310,7 +311,9 @@ public class DirectRunner
KeyedPValueTrackingVisitor keyedPValueVisitor =
KeyedPValueTrackingVisitor.create(
ImmutableSet.<Class<? extends PTransform>>of(
- GroupByKey.class, DirectGroupByKeyOnly.class));
+ GBKIntoKeyedWorkItems.class,
+ DirectGroupByKeyOnly.class,
+ DirectGroupAlsoByWindow.class));
pipeline.traverseTopologically(keyedPValueVisitor);
DisplayDataValidator.validatePipeline(pipeline);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f03b4fe1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index b946e4d..36c742b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -112,6 +112,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
private @SuppressWarnings("unchecked") final WindowingStrategy<?, BoundedWindow>
windowingStrategy;
+ private final StructuralKey<?> structuralKey;
private final Collection<UncommittedBundle<?>> outputBundles;
private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> unprocessedElements;
private final AggregatorContainer.Mutator aggregatorChanges;
@@ -130,6 +131,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
this.evaluationContext = evaluationContext;
this.application = application;
+ structuralKey = inputBundle.getKey();
stepContext = evaluationContext
.getExecutionContext(application, inputBundle.getKey())
.getOrCreateStepContext(
@@ -159,7 +161,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
K key = workItem.key();
UncommittedBundle<KV<K, Iterable<V>>> bundle =
- evaluationContext.createBundle(application.getOutput());
+ evaluationContext.createKeyedBundle(structuralKey, application.getOutput());
outputBundles.add(bundle);
CopyOnAccessInMemoryStateInternals<K> stateInternals =
(CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();
[11/11] incubator-beam git commit: This closes #1432
Posted by da...@apache.org.
This closes #1432
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3dbeb8ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3dbeb8ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3dbeb8ed
Branch: refs/heads/python-sdk
Commit: 3dbeb8edfdfe4c9e8987e4d8df4451fdb748dc07
Parents: 9b9d016 2e03bb8
Author: Davor Bonaci <da...@google.com>
Authored: Wed Nov 23 16:02:41 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:41 2016 -0800
----------------------------------------------------------------------
runners/apex/pom.xml | 7 +-
.../translation/ParDoBoundMultiTranslator.java | 67 +++++----
.../apex/translation/ParDoBoundTranslator.java | 46 +++---
.../beam/runners/direct/DirectRunner.java | 5 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 4 +-
.../beam/runners/direct/TransformExecutor.java | 5 +-
runners/flink/runner/pom.xml | 1 +
.../FlinkBatchTransformTranslators.java | 34 ++++-
.../FlinkStreamingTransformTranslators.java | 25 +++-
runners/spark/pom.xml | 1 +
.../spark/translation/TransformTranslator.java | 23 +++
.../beam/sdk/metrics/MetricsEnvironment.java | 60 ++++++--
.../beam/sdk/testing/UsesStatefulParDo.java | 25 ++++
.../beam/sdk/util/common/ReflectHelpers.java | 3 +-
.../sdk/metrics/MetricsEnvironmentTest.java | 8 +-
.../apache/beam/sdk/metrics/MetricsTest.java | 6 +-
.../sdk/runners/TransformHierarchyTest.java | 142 +++++++++++++++++++
.../src/main/java/StarterPipeline.java | 18 +--
.../src/main/java/it/pkg/StarterPipeline.java | 18 +--
19 files changed, 407 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
[04/11] incubator-beam git commit: Update StarterPipeline
Posted by da...@apache.org.
Update StarterPipeline
Convert StarterPipeline ParDo to MapElements.
Use the new DoFn for non-outputting transforms.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1fc8d65a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1fc8d65a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1fc8d65a
Branch: refs/heads/python-sdk
Commit: 1fc8d65a079e58d740a9b954da980963f20e9edf
Parents: 9b9d016
Author: Scott Wegner <sw...@google.com>
Authored: Mon Nov 21 16:33:07 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:01:31 2016 -0800
----------------------------------------------------------------------
.../src/main/java/StarterPipeline.java | 18 ++++++++++--------
.../src/main/java/it/pkg/StarterPipeline.java | 18 ++++++++++--------
2 files changed, 20 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fc8d65a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
index 0b21aa6..d6afdec 100644
--- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
+++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
@@ -20,13 +20,15 @@ package ${package};
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A starter example for writing Google Cloud Dataflow programs.
+ * A starter example for writing Beam programs.
*
* <p>The example takes two strings, converts them to their upper-case
* representation and logs them.
@@ -39,7 +41,7 @@ import org.slf4j.LoggerFactory;
* Platform, you should specify the following command-line options:
* --project=<YOUR_PROJECT_ID>
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
- * --runner=BlockingDataflowRunner
+ * --runner=DataflowRunner
*/
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
@@ -49,14 +51,14 @@ public class StarterPipeline {
PipelineOptionsFactory.fromArgs(args).withValidation().create());
p.apply(Create.of("Hello", "World"))
- .apply(ParDo.of(new OldDoFn<String, String>() {
+ .apply(MapElements.via(new SimpleFunction<String, String>() {
@Override
- public void processElement(ProcessContext c) {
- c.output(c.element().toUpperCase());
+ public String apply(String input) {
+ return input.toUpperCase();
}
}))
- .apply(ParDo.of(new OldDoFn<String, Void>() {
- @Override
+ .apply(ParDo.of(new DoFn<String, Void>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
LOG.info(c.element());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fc8d65a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
index b332442..4ae92e8 100644
--- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
+++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
@@ -20,13 +20,15 @@ package it.pkg;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A starter example for writing Google Cloud Dataflow programs.
+ * A starter example for writing Beam programs.
*
* <p>The example takes two strings, converts them to their upper-case
* representation and logs them.
@@ -39,7 +41,7 @@ import org.slf4j.LoggerFactory;
* Platform, you should specify the following command-line options:
* --project=<YOUR_PROJECT_ID>
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
- * --runner=BlockingDataflowRunner
+ * --runner=DataflowRunner
*/
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
@@ -49,14 +51,14 @@ public class StarterPipeline {
PipelineOptionsFactory.fromArgs(args).withValidation().create());
p.apply(Create.of("Hello", "World"))
- .apply(ParDo.of(new OldDoFn<String, String>() {
+ .apply(MapElements.via(new SimpleFunction<String, String>() {
@Override
- public void processElement(ProcessContext c) {
- c.output(c.element().toUpperCase());
+ public String apply(String input) {
+ return input.toUpperCase();
}
}))
- .apply(ParDo.of(new OldDoFn<String, Void>() {
- @Override
+ .apply(ParDo.of(new DoFn<String, Void>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
LOG.info(c.element());
}
[06/11] incubator-beam git commit: Reject stateful DoFn in ApexRunner
Posted by da...@apache.org.
Reject stateful DoFn in ApexRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/796ba7ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/796ba7ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/796ba7ab
Branch: refs/heads/python-sdk
Commit: 796ba7ab75bc8d01a3a59efc29cdc17bcd26af4a
Parents: 413a402
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:33:01 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800
----------------------------------------------------------------------
runners/apex/pom.xml | 1 +
.../translation/ParDoBoundMultiTranslator.java | 67 +++++++++++++-------
.../apex/translation/ParDoBoundTranslator.java | 46 +++++++++-----
3 files changed, 74 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/796ba7ab/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 5478b24..d0b0fdf 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -185,6 +185,7 @@
</goals>
<configuration>
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+ <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
<dependenciesToScan>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/796ba7ab/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index 7c91b91..fed5f4b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -23,17 +23,17 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.OutputPort;
import com.google.common.collect.Maps;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
+import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
@@ -53,20 +53,35 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
@Override
public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
- OldDoFn<InputT, OutputT> doFn = transform.getFn();
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ ApexRunner.class.getSimpleName()));
+ }
+ OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
PCollectionTuple output = context.getOutput();
PCollection<InputT> input = context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Coder<InputT> inputCoder = input.getCoder();
- WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
- input.getWindowingStrategy().getWindowFn().windowCoder());
+ WindowedValueCoder<InputT> wvInputCoder =
+ FullWindowedValueCoder.of(
+ inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
- ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
- context.getPipelineOptions(),
- doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
- context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder,
- context.<Void>stateInternalsFactory()
- );
+ ApexParDoOperator<InputT, OutputT> operator =
+ new ApexParDoOperator<>(
+ context.getPipelineOptions(),
+ oldDoFn,
+ transform.getMainOutputTag(),
+ transform.getSideOutputTags().getAll(),
+ context.<PCollection<?>>getInput().getWindowingStrategy(),
+ sideInputs,
+ wvInputCoder,
+ context.<Void>stateInternalsFactory());
Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
@@ -91,7 +106,9 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
}
}
- static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs,
+ static void addSideInputs(
+ ApexParDoOperator<?, ?> operator,
+ List<PCollectionView<?>> sideInputs,
TranslationContext context) {
Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
if (sideInputs.size() > sideInputPorts.length) {
@@ -105,8 +122,8 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
}
}
- private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs,
- TranslationContext context) {
+ private static PCollection<?> unionSideInputs(
+ List<PCollectionView<?>> sideInputs, TranslationContext context) {
checkArgument(sideInputs.size() > 1, "requires multiple side inputs");
// flatten and assign union tag
List<PCollection<Object>> sourceCollections = new ArrayList<>();
@@ -115,13 +132,16 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
for (int i = 0; i < sideInputs.size(); i++) {
PCollectionView<?> sideInput = sideInputs.get(i);
PCollection<?> sideInputCollection = context.getViewInput(sideInput);
- if (!sideInputCollection.getWindowingStrategy().equals(
- firstSideInput.getWindowingStrategy())) {
+ if (!sideInputCollection
+ .getWindowingStrategy()
+ .equals(firstSideInput.getWindowingStrategy())) {
// TODO: check how to handle this in stream codec
//String msg = "Multiple side inputs with different window strategies.";
//throw new UnsupportedOperationException(msg);
- LOG.warn("Side inputs union with different windowing strategies {} {}",
- firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy());
+ LOG.warn(
+ "Side inputs union with different windowing strategies {} {}",
+ firstSideInput.getWindowingStrategy(),
+ sideInputCollection.getWindowingStrategy());
}
if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
String msg = "Multiple side inputs with different coders.";
@@ -131,12 +151,11 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
unionTags.put(sideInputCollection, i);
}
- PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(
- firstSideInput, firstSideInput.getCoder());
- FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection,
- context);
+ PCollection<Object> resultCollection =
+ FlattenPCollectionTranslator.intermediateCollection(
+ firstSideInput, firstSideInput.getCoder());
+ FlattenPCollectionTranslator.flattenCollections(
+ sourceCollections, unionTags, resultCollection, context);
return resultCollection;
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/796ba7ab/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index c1ebbd5..7a918a7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -19,12 +19,13 @@
package org.apache.beam.runners.apex.translation;
import java.util.List;
-
+import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
@@ -32,33 +33,46 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-/**
- * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}.
- */
-class ParDoBoundTranslator<InputT, OutputT> implements
- TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+/** {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}. */
+class ParDoBoundTranslator<InputT, OutputT>
+ implements TransformTranslator<ParDo.Bound<InputT, OutputT>> {
private static final long serialVersionUID = 1L;
@Override
public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
- OldDoFn<InputT, OutputT> doFn = transform.getFn();
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ ApexRunner.class.getSimpleName()));
+ }
+ OldDoFn<InputT, OutputT> oldDoFn = transform.getOldFn();
PCollection<OutputT> output = context.getOutput();
PCollection<InputT> input = context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Coder<InputT> inputCoder = input.getCoder();
- WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
- input.getWindowingStrategy().getWindowFn().windowCoder());
+ WindowedValueCoder<InputT> wvInputCoder =
+ FullWindowedValueCoder.of(
+ inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
- ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
- context.getPipelineOptions(),
- doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
- output.getWindowingStrategy(), sideInputs, wvInputCoder,
- context.<Void>stateInternalsFactory()
- );
+ ApexParDoOperator<InputT, OutputT> operator =
+ new ApexParDoOperator<>(
+ context.getPipelineOptions(),
+ oldDoFn,
+ new TupleTag<OutputT>(),
+ TupleTagList.empty().getAll() /*sideOutputTags*/,
+ output.getWindowingStrategy(),
+ sideInputs,
+ wvInputCoder,
+ context.<Void>stateInternalsFactory());
context.addOperator(operator, operator.output);
context.addStream(context.getInput(), operator.input);
if (!sideInputs.isEmpty()) {
- ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
+ ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
}
}
}
[02/11] incubator-beam git commit: Reject stateful DoFn in SparkRunner
Posted by da...@apache.org.
Reject stateful DoFn in SparkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4dd19782
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4dd19782
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4dd19782
Branch: refs/heads/python-sdk
Commit: 4dd19782f2624bf8aed3df8484fa314f94904571
Parents: 255ad9a
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:33:13 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:01:31 2016 -0800
----------------------------------------------------------------------
runners/spark/pom.xml | 1 +
.../spark/translation/TransformTranslator.java | 23 ++++++++++++++++++++
2 files changed, 24 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4dd19782/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 4c5b3f5..88223e2 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -72,6 +72,7 @@
</goals>
<configuration>
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+ <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<failIfNoTests>true</failIfNoTests>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4dd19782/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index c902ee3..60d668e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -31,6 +31,7 @@ import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.beam.runners.core.AssignWindowsDoFn;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.io.SourceRDD;
@@ -47,12 +48,14 @@ import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -225,6 +228,16 @@ public final class TransformTranslator {
return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
@Override
public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ SparkRunner.class.getSimpleName()));
+ }
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<InputT>> inRDD =
((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
@@ -247,6 +260,16 @@ public final class TransformTranslator {
return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() {
@Override
public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ SparkRunner.class.getSimpleName()));
+ }
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<InputT>> inRDD =
((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
[03/11] incubator-beam git commit: Add JUnit category for stateful
ParDo tests
Posted by da...@apache.org.
Add JUnit category for stateful ParDo tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/255ad9a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/255ad9a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/255ad9a3
Branch: refs/heads/python-sdk
Commit: 255ad9a327133ab4f05ebbceca236d5fe0006028
Parents: 1fc8d65
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Nov 21 15:41:13 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:01:31 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/testing/UsesStatefulParDo.java | 25 ++++++++++++++++++++
1 file changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/255ad9a3/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
new file mode 100644
index 0000000..8bd6330
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Category tag for validation tests which utilize stateful {@link ParDo}.
+ */
+public interface UsesStatefulParDo {}
[08/11] incubator-beam git commit: Add TransformHierarchyTest
Posted by da...@apache.org.
Add TransformHierarchyTest
This tests basic features of TransformHierarchy
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dcd401ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dcd401ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dcd401ba
Branch: refs/heads/python-sdk
Commit: dcd401ba0b5bd12343484b0df50b15b6ef10ace9
Parents: f03b4fe
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 22 16:14:29 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800
----------------------------------------------------------------------
.../sdk/runners/TransformHierarchyTest.java | 142 +++++++++++++++++++
1 file changed, 142 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dcd401ba/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
new file mode 100644
index 0000000..c28f23e
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.runners;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link TransformHierarchy}.
+ */
+@RunWith(JUnit4.class)
+public class TransformHierarchyTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ private TransformHierarchy hierarchy;
+ private TestPipeline pipeline;
+
+ @Before
+ public void setup() {
+ hierarchy = new TransformHierarchy();
+ pipeline = TestPipeline.create();
+ }
+
+ @Test
+ public void getCurrentNoPushReturnsRoot() {
+ assertThat(hierarchy.getCurrent().isRootNode(), is(true));
+ }
+
+ @Test
+ public void popWithoutPushThrows() {
+ thrown.expect(IllegalStateException.class);
+ hierarchy.popNode();
+ }
+
+ @Test
+ public void pushThenPopSucceeds() {
+ TransformTreeNode root = hierarchy.getCurrent();
+ TransformTreeNode node =
+ new TransformTreeNode(hierarchy.getCurrent(), Create.of(1), "Create", PBegin.in(pipeline));
+ hierarchy.pushNode(node);
+ assertThat(hierarchy.getCurrent(), equalTo(node));
+ hierarchy.popNode();
+ assertThat(hierarchy.getCurrent(), equalTo(root));
+ }
+
+ @Test
+ public void visitVisitsAllPushed() {
+ TransformTreeNode root = hierarchy.getCurrent();
+ Create.Values<Integer> create = Create.of(1);
+ PCollection<Integer> created = pipeline.apply(create);
+ PBegin begin = PBegin.in(pipeline);
+
+ TransformTreeNode compositeNode =
+ new TransformTreeNode(root, create, "Create", begin);
+ root.addComposite(compositeNode);
+ TransformTreeNode primitiveNode =
+ new TransformTreeNode(
+ compositeNode, Read.from(CountingSource.upTo(1L)), "Create/Read", begin);
+ compositeNode.addComposite(primitiveNode);
+
+ TransformTreeNode otherPrimitive =
+ new TransformTreeNode(
+ root, MapElements.via(new SimpleFunction<Integer, Integer>() {
+ @Override
+ public Integer apply(Integer input) {
+ return input;
+ }
+ }), "ParDo", created);
+ root.addComposite(otherPrimitive);
+ otherPrimitive.addInputProducer(created, primitiveNode);
+
+ hierarchy.pushNode(compositeNode);
+ hierarchy.pushNode(primitiveNode);
+ hierarchy.popNode();
+ hierarchy.popNode();
+ hierarchy.pushNode(otherPrimitive);
+ hierarchy.popNode();
+
+ final Set<TransformTreeNode> visitedCompositeNodes = new HashSet<>();
+ final Set<TransformTreeNode> visitedPrimitiveNodes = new HashSet<>();
+ final Set<PValue> visitedValuesInVisitor = new HashSet<>();
+
+ Set<PValue> visitedValues = new HashSet<>();
+ hierarchy.visit(new PipelineVisitor.Defaults() {
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+ visitedCompositeNodes.add(node);
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformTreeNode node) {
+ visitedPrimitiveNodes.add(node);
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformTreeNode producer) {
+ visitedValuesInVisitor.add(value);
+ }
+ }, visitedValues);
+
+ assertThat(visitedCompositeNodes, containsInAnyOrder(root, compositeNode));
+ assertThat(visitedPrimitiveNodes, containsInAnyOrder(primitiveNode, otherPrimitive));
+ assertThat(visitedValuesInVisitor, Matchers.<PValue>containsInAnyOrder(created));
+ }
+}
[09/11] incubator-beam git commit: Use more natural class to find
class loader in ReflectHelpers
Posted by da...@apache.org.
Use more natural class to find class loader in ReflectHelpers
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6f86af61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6f86af61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6f86af61
Branch: refs/heads/python-sdk
Commit: 6f86af612f97ad57cf4ba2cae21ba232f7494ada
Parents: dcd401b
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 22 22:16:29 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f86af61/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
index 637e8e3..4ec39c1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
@@ -39,7 +39,6 @@ import java.util.Queue;
import java.util.ServiceLoader;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.IOChannelUtils;
/**
* Utilities for working with with {@link Class Classes} and {@link Method Methods}.
@@ -225,7 +224,7 @@ public class ReflectHelpers {
public static ClassLoader findClassLoader() {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
- classLoader = IOChannelUtils.class.getClassLoader();
+ classLoader = ReflectHelpers.class.getClassLoader();
}
if (classLoader == null) {
classLoader = ClassLoader.getSystemClassLoader();