You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/24 16:10:55 UTC
[01/14] incubator-beam git commit: Add adapter from
OldDoFn.RequiresWindowAccess to DoFn
Repository: incubator-beam
Updated Branches:
refs/heads/master 8dfadbf01 -> c390a2a7f
Add adapter from OldDoFn.RequiresWindowAccess to DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/164ee56b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/164ee56b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/164ee56b
Branch: refs/heads/master
Commit: 164ee56b41e01c0ee637eff24e23a814b5885e6f
Parents: a9a41eb
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 23 19:45:18 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:50 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/OldDoFn.java | 89 +++++++++++++++++---
1 file changed, 76 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/164ee56b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index e3cfc38..72c2965 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -77,19 +77,12 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
public DoFn<InputT, OutputT> toDoFn() {
if (this instanceof RequiresWindowAccess) {
- throw new UnsupportedOperationException(
- String.format(
- "Cannot convert %s to %s because it implements %s."
- + " Please convert your %s to a %s directly.",
- getClass(),
- DoFn.class.getSimpleName(),
- RequiresWindowAccess.class.getSimpleName(),
- OldDoFn.class.getSimpleName(),
- DoFn.class.getSimpleName()));
- }
-
- // No parameters as it just accesses `this`
- return new AdaptedDoFn();
+ // No parameters as it just accesses `this`
+ return new AdaptedRequiresWindowAccessDoFn();
+ } else {
+ // No parameters as it just accesses `this`
+ return new AdaptedDoFn();
+ }
}
/**
@@ -770,4 +763,74 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
return OldDoFn.this.getOutputTypeDescriptor();
}
}
+
+ /**
+ * A {@link ProcessContext} for an {@link OldDoFn} that implements
+ * {@link OldDoFn.RequiresWindowAcccess}, via a context for a proper {@link DoFn}.
+ */
+ private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext {
+
+ private final BoundedWindow window;
+
+ public AdaptedRequiresWindowAccessProcessContext(
+ DoFn<InputT, OutputT>.ProcessContext newContext,
+ BoundedWindow window) {
+ super(newContext);
+ this.window = window;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return window;
+ }
+ }
+
+ private class AdaptedRequiresWindowAccessDoFn extends DoFn<InputT, OutputT> {
+
+ @Setup
+ public void setup() throws Exception {
+ OldDoFn.this.setup();
+ }
+
+ @StartBundle
+ public void startBundle(Context c) throws Exception {
+ OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ OldDoFn.this.processElement(
+ OldDoFn.this.new AdaptedRequiresWindowAccessProcessContext(c, window));
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) throws Exception {
+ OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
+ }
+
+ @Teardown
+ public void teardown() throws Exception {
+ OldDoFn.this.teardown();
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return OldDoFn.this.getAllowedTimestampSkew();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ OldDoFn.this.populateDisplayData(builder);
+ }
+
+ @Override
+ protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+ return OldDoFn.this.getInputTypeDescriptor();
+ }
+
+ @Override
+ protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+ return OldDoFn.this.getOutputTypeDescriptor();
+ }
+ }
}
[05/14] incubator-beam git commit: Add setupDelegatingAggregators for
DoFn (for now)
Posted by ke...@apache.org.
Add setupDelegatingAggregators for DoFn (for now)
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c2e751f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c2e751f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c2e751f4
Branch: refs/heads/master
Commit: c2e751f49d72968f2478931cdb884fd4af173610
Parents: 08dd149
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 11:53:29 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/SimpleDoFnRunner.java | 1 +
.../org/apache/beam/sdk/transforms/DoFn.java | 24 ++++++++++++++++++++
2 files changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2e751f4/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 0360bc2..1cf56a6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -228,6 +228,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
this.stepContext = stepContext;
this.aggregatorFactory = aggregatorFactory;
this.windowFn = windowFn;
+ super.setupDelegateAggregators();
}
//////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2e751f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 0531cbb..11ca853 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -214,6 +214,30 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
protected abstract <AggInputT, AggOutputT>
Aggregator<AggInputT, AggOutputT> createAggregator(
String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
+
+ /**
+ * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this context.
+ *
+ * <p>This method should be called by runners before the {@link StartBundle @StartBundle} method.
+ */
+ @Experimental(Kind.AGGREGATOR)
+ protected final void setupDelegateAggregators() {
+ for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
+ setupDelegateAggregator(aggregator);
+ }
+
+ aggregatorsAreFinal = true;
+ }
+
+ private <AggInputT, AggOutputT> void setupDelegateAggregator(
+ DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
+
+ Aggregator<AggInputT, AggOutputT> delegate = createAggregator(
+ aggregator.getName(), aggregator.getCombineFn());
+
+ aggregator.setDelegate(delegate);
+ }
+
}
/**
[12/14] incubator-beam git commit: Port direct runner to use new DoFn
directly
Posted by ke...@apache.org.
Port direct runner to use new DoFn directly
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1919d8b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1919d8b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1919d8b3
Branch: refs/heads/master
Commit: 1919d8b3a850bd146137652546da851ee461cd28
Parents: f0c8d30
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 20 20:55:00 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700
----------------------------------------------------------------------
.../runners/direct/DoFnLifecycleManager.java | 42 +++++++++--------
.../beam/runners/direct/ParDoEvaluator.java | 3 +-
.../direct/ParDoMultiEvaluatorFactory.java | 6 +--
.../direct/ParDoSingleEvaluatorFactory.java | 5 +-
...leManagerRemovingTransformEvaluatorTest.java | 16 +++----
.../direct/DoFnLifecycleManagerTest.java | 12 ++---
.../direct/DoFnLifecycleManagersTest.java | 48 ++++++++++++++++----
.../direct/ParDoMultiEvaluatorFactoryTest.java | 11 +++++
.../direct/ParDoSingleEvaluatorFactoryTest.java | 11 +++++
.../beam/runners/direct/SplittableDoFnTest.java | 8 +++-
.../org/apache/beam/sdk/transforms/OldDoFn.java | 23 ++++++++--
11 files changed, 130 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
index 0e15c18..23460b6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
@@ -26,7 +26,9 @@ import java.util.Collection;
import java.util.Iterator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn.Setup;
+import org.apache.beam.sdk.transforms.DoFn.Teardown;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.SerializableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,49 +37,49 @@ import org.slf4j.LoggerFactory;
* Manages {@link DoFn} setup, teardown, and serialization.
*
* <p>{@link DoFnLifecycleManager} is similar to a {@link ThreadLocal} storing a {@link DoFn}, but
- * calls the {@link DoFn} {@link Setup} the first time the {@link DoFn} is obtained and {@link
- * Teardown} whenever the {@link DoFn} is removed, and provides a method for clearing all cached
- * {@link DoFn DoFns}.
+ * calls the {@link DoFn} {@link Setup @Setup} method the first time the {@link DoFn} is obtained
+ * and {@link Teardown @Teardown} whenever the {@link DoFn} is removed, and provides a method for
+ * clearing all cached {@link DoFn DoFns}.
*/
class DoFnLifecycleManager {
private static final Logger LOG = LoggerFactory.getLogger(DoFnLifecycleManager.class);
- public static DoFnLifecycleManager of(OldDoFn<?, ?> original) {
+ public static DoFnLifecycleManager of(DoFn<?, ?> original) {
return new DoFnLifecycleManager(original);
}
- private final LoadingCache<Thread, OldDoFn<?, ?>> outstanding;
+ private final LoadingCache<Thread, DoFn<?, ?>> outstanding;
- private DoFnLifecycleManager(OldDoFn<?, ?> original) {
+ private DoFnLifecycleManager(DoFn<?, ?> original) {
this.outstanding = CacheBuilder.newBuilder().build(new DeserializingCacheLoader(original));
}
- public OldDoFn<?, ?> get() throws Exception {
+ public DoFn<?, ?> get() throws Exception {
Thread currentThread = Thread.currentThread();
return outstanding.get(currentThread);
}
public void remove() throws Exception {
Thread currentThread = Thread.currentThread();
- OldDoFn<?, ?> fn = outstanding.asMap().remove(currentThread);
- fn.teardown();
+ DoFn<?, ?> fn = outstanding.asMap().remove(currentThread);
+ DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown();
}
/**
* Remove all {@link DoFn DoFns} from this {@link DoFnLifecycleManager}. Returns all exceptions
* that were thrown while calling the remove methods.
*
- * <p>If the returned Collection is nonempty, an exception was thrown from at least one
- * {@link DoFn#teardown()} method, and the {@link PipelineRunner} should throw an exception.
+ * <p>If the returned Collection is nonempty, an exception was thrown from at least one {@link
+ * DoFn.Teardown @Teardown} method, and the {@link PipelineRunner} should throw an exception.
*/
public Collection<Exception> removeAll() throws Exception {
- Iterator<OldDoFn<?, ?>> fns = outstanding.asMap().values().iterator();
+ Iterator<DoFn<?, ?>> fns = outstanding.asMap().values().iterator();
Collection<Exception> thrown = new ArrayList<>();
while (fns.hasNext()) {
- OldDoFn<?, ?> fn = fns.next();
+ DoFn<?, ?> fn = fns.next();
fns.remove();
try {
- fn.teardown();
+ DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown();
} catch (Exception e) {
thrown.add(e);
}
@@ -85,18 +87,18 @@ class DoFnLifecycleManager {
return thrown;
}
- private class DeserializingCacheLoader extends CacheLoader<Thread, OldDoFn<?, ?>> {
+ private class DeserializingCacheLoader extends CacheLoader<Thread, DoFn<?, ?>> {
private final byte[] original;
- public DeserializingCacheLoader(OldDoFn<?, ?> original) {
+ public DeserializingCacheLoader(DoFn<?, ?> original) {
this.original = SerializableUtils.serializeToByteArray(original);
}
@Override
- public OldDoFn<?, ?> load(Thread key) throws Exception {
- OldDoFn<?, ?> fn = (OldDoFn<?, ?>) SerializableUtils.deserializeFromByteArray(original,
+ public DoFn<?, ?> load(Thread key) throws Exception {
+ DoFn<?, ?> fn = (DoFn<?, ?>) SerializableUtils.deserializeFromByteArray(original,
"DoFn Copy in thread " + key.getName());
- fn.setup();
+ DoFnInvokers.INSTANCE.invokerFor(fn).invokeSetup();
return fn;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index a59fb4d..b524dfa 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.direct;
import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -45,7 +46,7 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
DirectStepContext stepContext,
CommittedBundle<InputT> inputBundle,
AppliedPTransform<PCollection<InputT>, ?, ?> application,
- Object fn,
+ Serializable fn, // may be OldDoFn or DoFn
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index d909e8b..02469ff 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -24,7 +24,7 @@ import java.util.Map;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
import org.apache.beam.sdk.values.PCollection;
@@ -50,7 +50,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key)
throws Exception {
BoundMulti<?, ?> bound = (BoundMulti<?, ?>) key.getTransform();
- return DoFnLifecycleManager.of(bound.getFn());
+ return DoFnLifecycleManager.of(bound.getNewFn());
}
});
}
@@ -87,7 +87,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
stepContext,
inputBundle,
application,
- (OldDoFn) fnLocal.get(),
+ (DoFn) fnLocal.get(),
application.getTransform().getSideInputs(),
application.getTransform().getMainOutputTag(),
application.getTransform().getSideOutputTags().getAll(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 1a06ea6..0584e41 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -25,7 +25,6 @@ import java.util.Collections;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo.Bound;
import org.apache.beam.sdk.values.PCollection;
@@ -52,7 +51,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key)
throws Exception {
Bound<?, ?> bound = (Bound<?, ?>) key.getTransform();
- return DoFnLifecycleManager.of(bound.getFn());
+ return DoFnLifecycleManager.of(bound.getNewFn());
}
});
}
@@ -92,7 +91,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
stepContext,
inputBundle,
application,
- (OldDoFn) fnLocal.get(),
+ fnLocal.get(),
application.getTransform().getSideInputs(),
mainOutputTag,
Collections.<TupleTag<?>>emptyList(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index 2e4fee2..9e2732e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -27,7 +27,7 @@ import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.hamcrest.Matchers;
import org.junit.Before;
@@ -50,7 +50,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
@Test
public void delegatesToUnderlying() throws Exception {
RecordingTransformEvaluator underlying = new RecordingTransformEvaluator();
- OldDoFn<?, ?> original = lifecycleManager.get();
+ DoFn<?, ?> original = lifecycleManager.get();
TransformEvaluator<Object> evaluator =
DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
WindowedValue<Object> first = WindowedValue.valueInGlobalWindow(new Object());
@@ -67,7 +67,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
@Test
public void removesOnExceptionInProcessElement() throws Exception {
ThrowingTransformEvaluator underlying = new ThrowingTransformEvaluator();
- OldDoFn<?, ?> original = lifecycleManager.get();
+ DoFn<?, ?> original = lifecycleManager.get();
assertThat(original, not(nullValue()));
TransformEvaluator<Object> evaluator =
DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
@@ -75,7 +75,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
try {
evaluator.processElement(WindowedValue.valueInGlobalWindow(new Object()));
} catch (Exception e) {
- assertThat(lifecycleManager.get(), not(Matchers.<OldDoFn<?, ?>>theInstance(original)));
+ assertThat(lifecycleManager.get(), not(Matchers.<DoFn<?, ?>>theInstance(original)));
return;
}
fail("Expected ThrowingTransformEvaluator to throw on method call");
@@ -84,7 +84,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
@Test
public void removesOnExceptionInFinishBundle() throws Exception {
ThrowingTransformEvaluator underlying = new ThrowingTransformEvaluator();
- OldDoFn<?, ?> original = lifecycleManager.get();
+ DoFn<?, ?> original = lifecycleManager.get();
// the LifecycleManager is set when the evaluator starts
assertThat(original, not(nullValue()));
TransformEvaluator<Object> evaluator =
@@ -94,7 +94,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
evaluator.finishBundle();
} catch (Exception e) {
assertThat(lifecycleManager.get(),
- Matchers.not(Matchers.<OldDoFn<?, ?>>theInstance(original)));
+ Matchers.not(Matchers.<DoFn<?, ?>>theInstance(original)));
return;
}
fail("Expected ThrowingTransformEvaluator to throw on method call");
@@ -134,8 +134,8 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
}
- private static class TestFn extends OldDoFn<Object, Object> {
- @Override
+ private static class TestFn extends DoFn<Object, Object> {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
index 1f0af99..aef9d29 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -101,7 +101,7 @@ public class DoFnLifecycleManagerTest {
assertThat(obtained.setupCalled, is(true));
assertThat(obtained.teardownCalled, is(true));
- assertThat(mgr.get(), not(Matchers.<OldDoFn<?, ?>>theInstance(obtained)));
+ assertThat(mgr.get(), not(Matchers.<DoFn<?, ?>>theInstance(obtained)));
}
@Test
@@ -142,11 +142,11 @@ public class DoFnLifecycleManagerTest {
}
- private static class TestFn extends OldDoFn<Object, Object> {
+ private static class TestFn extends DoFn<Object, Object> {
boolean setupCalled = false;
boolean teardownCalled = false;
- @Override
+ @Setup
public void setup() {
checkState(!setupCalled);
checkState(!teardownCalled);
@@ -154,11 +154,11 @@ public class DoFnLifecycleManagerTest {
setupCalled = true;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
}
- @Override
+ @Teardown
public void teardown() {
checkState(setupCalled);
checkState(!teardownCalled);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java
index 39a4a9d..a19ff99 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java
@@ -18,12 +18,15 @@
package org.apache.beam.runners.direct;
+import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collection;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.UserCodeException;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
@@ -51,9 +54,15 @@ public class DoFnLifecycleManagersTest {
third.get();
final Collection<Matcher<? super Throwable>> suppressions = new ArrayList<>();
- suppressions.add(new ThrowableMessageMatcher("foo"));
- suppressions.add(new ThrowableMessageMatcher("bar"));
- suppressions.add(new ThrowableMessageMatcher("baz"));
+ suppressions.add(allOf(
+ instanceOf(UserCodeException.class),
+ new CausedByMatcher(new ThrowableMessageMatcher("foo"))));
+ suppressions.add(allOf(
+ instanceOf(UserCodeException.class),
+ new CausedByMatcher(new ThrowableMessageMatcher("bar"))));
+ suppressions.add(allOf(
+ instanceOf(UserCodeException.class),
+ new CausedByMatcher(new ThrowableMessageMatcher("baz"))));
thrown.expect(
new BaseMatcher<Exception>() {
@@ -90,18 +99,18 @@ public class DoFnLifecycleManagersTest {
DoFnLifecycleManagers.removeAllFromManagers(ImmutableList.of(first, second, third));
}
- private static class ThrowsInCleanupFn extends OldDoFn<Object, Object> {
+ private static class ThrowsInCleanupFn extends DoFn<Object, Object> {
private final String message;
private ThrowsInCleanupFn(String message) {
this.message = message;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
}
- @Override
+ @Teardown
public void teardown() throws Exception {
throw new Exception(message);
}
@@ -130,9 +139,32 @@ public class DoFnLifecycleManagersTest {
}
}
+ private static class CausedByMatcher extends BaseMatcher<Throwable> {
+ private final Matcher<Throwable> causeMatcher;
+
+ public CausedByMatcher(
+ Matcher<Throwable> causeMatcher) {
+ this.causeMatcher = causeMatcher;
+ }
- private static class EmptyFn extends OldDoFn<Object, Object> {
@Override
+ public boolean matches(Object item) {
+ if (!(item instanceof UserCodeException)) {
+ return false;
+ }
+ UserCodeException that = (UserCodeException) item;
+ return causeMatcher.matches(that.getCause());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a throwable with a cause ").appendDescriptionOf(causeMatcher);
+ }
+ }
+
+
+ private static class EmptyFn extends DoFn<Object, Object> {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index 88e1484..8b0070b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -236,6 +237,11 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
}
+ /**
+ * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+ * This should be ported to state when ready.
+ */
+ @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
@Test
public void finishBundleWithStatePutsStateInResult() throws Exception {
TestPipeline p = TestPipeline.create();
@@ -321,6 +327,11 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
containsInAnyOrder("foo", "bara", "bazam"));
}
+ /**
+ * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+ * This should be ported to state when ready.
+ */
+ @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
@Test
public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
TestPipeline p = TestPipeline.create();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index 6a02e40..e562b28 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.values.TupleTag;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -166,6 +167,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
assertThat(result.getAggregatorChanges(), equalTo(mutator));
}
+ /**
+ * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+ * This should be ported to state when ready.
+ */
+ @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
@Test
public void finishBundleWithStatePutsStateInResult() throws Exception {
TestPipeline p = TestPipeline.create();
@@ -238,6 +244,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
containsInAnyOrder("foo", "bara", "bazam"));
}
+ /**
+ * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+ * This should be ported to state when ready.
+ */
+ @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
@Test
public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
TestPipeline p = TestPipeline.create();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
index 84a0cd9..c164ce6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -46,6 +45,7 @@ import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.MutableDateTime;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -140,6 +140,9 @@ public class SplittableDoFnTest {
}
}
+ @Ignore(
+ "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; "
+ + "It must be implemented as a primitive.")
@Test
public void testPairWithIndexBasic() throws ClassNotFoundException {
Pipeline p = TestPipeline.create();
@@ -164,6 +167,9 @@ public class SplittableDoFnTest {
p.run();
}
+ @Ignore(
+ "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; "
+ + "It must be implemented as a primitive.")
@Test
public void testPairWithIndexWindowedTimestamped() throws ClassNotFoundException {
// Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index a9f26a4..f16e0b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -73,6 +73,10 @@ import org.joda.time.Instant;
public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
public DoFn<InputT, OutputT> toDoFn() {
+ DoFn<InputT, OutputT> doFn = DoFnAdapters.getDoFn(this);
+ if (doFn != null) {
+ return doFn;
+ }
if (this instanceof RequiresWindowAccess) {
// No parameters as it just accesses `this`
return new AdaptedRequiresWindowAccessDoFn();
@@ -553,8 +557,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
private final DoFn<InputT, OutputT>.ProcessContext newContext;
- public AdaptedProcessContext(
- DoFn<InputT, OutputT>.ProcessContext newContext) {
+ public AdaptedProcessContext(DoFn<InputT, OutputT>.ProcessContext newContext) {
this.newContext = newContext;
}
@@ -632,21 +635,31 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
private class AdaptedDoFn extends DoFn<InputT, OutputT> {
+ @Setup
+ public void setup() throws Exception {
+ OldDoFn.this.setup();
+ }
+
@StartBundle
- public void startBundle(DoFn.Context c) throws Exception {
+ public void startBundle(Context c) throws Exception {
OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
}
@ProcessElement
- public void processElement(DoFn.ProcessContext c) throws Exception {
+ public void processElement(ProcessContext c) throws Exception {
OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c));
}
@FinishBundle
- public void finishBundle(DoFn.Context c) throws Exception {
+ public void finishBundle(Context c) throws Exception {
OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
}
+ @Teardown
+ public void teardown() throws Exception {
+ OldDoFn.this.teardown();
+ }
+
@Override
public Duration getAllowedTimestampSkew() {
return OldDoFn.this.getAllowedTimestampSkew();
[09/14] incubator-beam git commit: Fix binding of aggregator
delegation in DoFnAdapters
Posted by ke...@apache.org.
Fix binding of aggregator delegation in DoFnAdapters
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ab955d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ab955d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ab955d5
Branch: refs/heads/master
Commit: 2ab955d5501c87adea3fd17ad5dd1ad73be13364
Parents: 4e185d0
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 12:44:47 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ab955d5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 0b0d207..ca724cd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -278,6 +278,7 @@ public class DoFnAdapters {
private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
fn.super();
this.context = context;
+ super.setupDelegateAggregators();
}
@Override
[08/14] incubator-beam git commit: Fix binding of aggregator creating
in OldDoFn
Posted by ke...@apache.org.
Fix binding of aggregator creating in OldDoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4e185d0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4e185d0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4e185d0b
Branch: refs/heads/master
Commit: 4e185d0b0a7ec4c096380a25b9cbe4703621ec6b
Parents: 3094017
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 12:44:30 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e185d0b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index b269f47..a9f26a4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -511,6 +511,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
public AdaptedContext(
DoFn<InputT, OutputT>.Context newContext) {
this.newContext = newContext;
+ super.setupDelegateAggregators();
}
@Override
@@ -541,7 +542,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
@Override
protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return null;
+ return newContext.createAggregator(name, combiner);
}
}
@@ -625,7 +626,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
@Override
protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return null;
+ return newContext.createAggregator(name, combiner);
}
}
[14/14] incubator-beam git commit: This closes #1157
Posted by ke...@apache.org.
This closes #1157
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c390a2a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c390a2a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c390a2a7
Branch: refs/heads/master
Commit: c390a2a7ff1b3a58de213f85218eef689e712df4
Parents: 8dfadbf 2089c5c
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 24 09:10:04 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 24 09:10:04 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/AggregatorFactory.java | 39 ++++
.../beam/runners/core/SimpleDoFnRunner.java | 8 +-
.../runners/direct/DoFnLifecycleManager.java | 42 ++--
.../beam/runners/direct/ParDoEvaluator.java | 3 +-
.../direct/ParDoMultiEvaluatorFactory.java | 6 +-
.../direct/ParDoSingleEvaluatorFactory.java | 5 +-
...leManagerRemovingTransformEvaluatorTest.java | 16 +-
.../direct/DoFnLifecycleManagerTest.java | 12 +-
.../direct/DoFnLifecycleManagersTest.java | 48 ++++-
.../direct/ParDoMultiEvaluatorFactoryTest.java | 11 +
.../direct/ParDoSingleEvaluatorFactoryTest.java | 11 +
.../beam/runners/direct/SplittableDoFnTest.java | 8 +-
.../functions/FlinkProcessContext.java | 10 +-
.../apache/beam/sdk/transforms/Aggregator.java | 11 +-
.../sdk/transforms/DelegatingAggregator.java | 125 +++++++++++
.../org/apache/beam/sdk/transforms/DoFn.java | 55 +++--
.../beam/sdk/transforms/DoFnAdapters.java | 1 +
.../apache/beam/sdk/transforms/DoFnTester.java | 18 +-
.../org/apache/beam/sdk/transforms/OldDoFn.java | 214 +++++++++----------
.../sdk/util/ReifyTimestampAndWindowsDoFn.java | 16 +-
.../org/apache/beam/sdk/util/StringUtils.java | 2 +-
.../DoFnDelegatingAggregatorTest.java | 5 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 16 +-
23 files changed, 463 insertions(+), 219 deletions(-)
----------------------------------------------------------------------
[07/14] incubator-beam git commit: Move shared DelegatingAggregator
out of OldDoFn
Posted by ke...@apache.org.
Move shared DelegatingAggregator out of OldDoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/139437bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/139437bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/139437bd
Branch: refs/heads/master
Commit: 139437bdca8872a11f6a87a9f54347985523faf2
Parents: 0d500ef
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 11:45:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700
----------------------------------------------------------------------
.../sdk/transforms/DelegatingAggregator.java | 125 +++++++++++++++++++
.../org/apache/beam/sdk/transforms/DoFn.java | 30 ++---
.../org/apache/beam/sdk/transforms/OldDoFn.java | 97 --------------
.../DoFnDelegatingAggregatorTest.java | 5 +-
4 files changed, 142 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
new file mode 100644
index 0000000..d92bb71
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.MoreObjects;
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * An {@link Aggregator} that delegates calls to {@link #addValue} to another aggregator.
+ *
+ * <p>This {@link Aggregator} is designed to be constructed without a delegate, at pipeline
+ * construction time, and serialized within a {@link DoFn}. The delegate aggregator to which it
+ * submits values must be provided by the runner at execution time.
+ *
+ * @param <AggInputT> the type of input element
+ * @param <AggOutputT> the type of output element
+ */
+class DelegatingAggregator<AggInputT, AggOutputT>
+ implements Aggregator<AggInputT, AggOutputT>, Serializable {
+ private final UUID id;
+
+ private final String name;
+
+ private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
+
+ private Aggregator<AggInputT, ?> delegate;
+
+ public DelegatingAggregator(String name,
+ CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+ this.id = UUID.randomUUID();
+ this.name = checkNotNull(name, "name cannot be null");
+ // Safe contravariant cast
+ @SuppressWarnings("unchecked")
+ CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
+ (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
+ this.combineFn = specificCombiner;
+ }
+
+ @Override
+ public void addValue(AggInputT value) {
+ if (delegate == null) {
+ throw new IllegalStateException(
+ String.format(
+ "addValue cannot be called on Aggregator outside of the execution of a %s.",
+ DoFn.class.getSimpleName()));
+ } else {
+ delegate.addValue(value);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
+ return combineFn;
+ }
+
+ /**
+ * Sets the current delegate of the Aggregator.
+ *
+ * @param delegate the delegate to set in this aggregator
+ */
+ public void setDelegate(Aggregator<AggInputT, ?> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("name", name)
+ .add("combineFn", combineFn)
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, name, combineFn.getClass());
+ }
+
+ /**
+ * Indicates whether some other object is "equal to" this one.
+ *
+ * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
+ * CombineFns are the same class, and they have identical IDs.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (o == null) {
+ return false;
+ }
+ if (o instanceof DelegatingAggregator) {
+ DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
+ return Objects.equals(this.id, that.id)
+ && Objects.equals(this.name, that.name)
+ && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 8b3aaf8..0531cbb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -775,31 +775,31 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
}
/**
- * Returns an {@link Aggregator} with aggregation logic specified by the
- * {@link CombineFn} argument. The name provided must be unique across
- * {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be created
- * during pipeline construction.
+ * Returns an {@link Aggregator} with aggregation logic specified by the {@link CombineFn}
+ * argument. The name provided must be unique across {@link Aggregator}s created within the {@link
+ * DoFn}. Aggregators can only be created during pipeline construction.
*
* @param name the name of the aggregator
* @param combiner the {@link CombineFn} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this {@link DoFn}
+ * @return an aggregator for the provided name and combiner in the scope of this {@link DoFn}
* @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
+ * @throws IllegalArgumentException if the given name collides with another aggregator in this
+ * scope
* @throws IllegalStateException if called during pipeline execution.
*/
- public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+ public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+ String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
checkNotNull(name, "name cannot be null");
checkNotNull(combiner, "combiner cannot be null");
- checkArgument(!aggregators.containsKey(name),
+ checkArgument(
+ !aggregators.containsKey(name),
"Cannot create aggregator with name %s."
- + " An Aggregator with that name already exists within this scope.",
+ + " An Aggregator with that name already exists within this scope.",
name);
- checkState(!aggregatorsAreFinal,
+ checkState(
+ !aggregatorsAreFinal,
"Cannot create an aggregator during pipeline execution."
- + " Aggregators should be registered during pipeline construction.");
+ + " Aggregators should be registered during pipeline construction.");
DelegatingAggregator<AggInputT, AggOutputT> aggregator =
new DelegatingAggregator<>(name, combiner);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 72c2965..b269f47 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -21,14 +21,11 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import com.google.common.base.MoreObjects;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -505,100 +502,6 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
}
/**
- * An {@link Aggregator} that delegates calls to addValue to another
- * aggregator.
- *
- * @param <AggInputT> the type of input element
- * @param <AggOutputT> the type of output element
- */
- static class DelegatingAggregator<AggInputT, AggOutputT> implements
- Aggregator<AggInputT, AggOutputT>, Serializable {
- private final UUID id;
-
- private final String name;
-
- private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
-
- private Aggregator<AggInputT, ?> delegate;
-
- public DelegatingAggregator(String name,
- CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
- this.id = UUID.randomUUID();
- this.name = checkNotNull(name, "name cannot be null");
- // Safe contravariant cast
- @SuppressWarnings("unchecked")
- CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
- (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
- this.combineFn = specificCombiner;
- }
-
- @Override
- public void addValue(AggInputT value) {
- if (delegate == null) {
- throw new IllegalStateException(
- "addValue cannot be called on Aggregator outside of the execution of a OldDoFn.");
- } else {
- delegate.addValue(value);
- }
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
- return combineFn;
- }
-
- /**
- * Sets the current delegate of the Aggregator.
- *
- * @param delegate the delegate to set in this aggregator
- */
- public void setDelegate(Aggregator<AggInputT, ?> delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("name", name)
- .add("combineFn", combineFn)
- .toString();
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, name, combineFn.getClass());
- }
-
- /**
- * Indicates whether some other object is "equal to" this one.
- *
- * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
- * CombineFns are the same class, and they have identical IDs.
- */
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (o == null) {
- return false;
- }
- if (o instanceof DelegatingAggregator) {
- DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
- return Objects.equals(this.id, that.id)
- && Objects.equals(this.name, that.name)
- && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
- }
- return false;
- }
- }
-
- /**
* A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
*/
private class AdaptedContext extends Context {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/139437bd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
index c072fd7..f51a6b0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -35,7 +34,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
- * Tests for {@link OldDoFn.DelegatingAggregator}.
+ * Tests for {@link DelegatingAggregator}.
*/
@RunWith(JUnit4.class)
public class DoFnDelegatingAggregatorTest {
@@ -63,7 +62,7 @@ public class DoFnDelegatingAggregatorTest {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("cannot be called");
- thrown.expectMessage("OldDoFn");
+ thrown.expectMessage("DoFn");
aggregator.addValue(21.2);
}
[03/14] incubator-beam git commit: Fix NPE in SimpleDoFnRunner
Posted by ke...@apache.org.
Fix NPE in SimpleDoFnRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0d500efe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0d500efe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0d500efe
Branch: refs/heads/master
Commit: 0d500efe3b2d4248e2889dfcf36423aa473ee0a5
Parents: b235041
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 11:30:33 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0d500efe/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 4eda376..0360bc2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collection;
@@ -511,7 +512,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
@Override
public BoundedWindow window() {
- return null;
+ return Iterables.getOnlyElement(windows);
}
@Override
[06/14] incubator-beam git commit: Move AggregatorFactory to
runners-core and deprecate SDK version
Posted by ke...@apache.org.
Move AggregatorFactory to runners-core and deprecate SDK version
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08dd1498
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08dd1498
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08dd1498
Branch: refs/heads/master
Commit: 08dd14981bad95a029be8ac758a6091c55850200
Parents: 139437b
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 11:49:07 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/AggregatorFactory.java | 39 ++++++++++++++++++++
.../apache/beam/sdk/transforms/Aggregator.java | 11 ++----
2 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dd1498/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
new file mode 100644
index 0000000..153d30d
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.ExecutionContext;
+
+/**
+ * A factory for creating aggregators.
+ */
+public interface AggregatorFactory {
+ /**
+ * Create an aggregator with the given {@code name} and {@link CombineFn}.
+ *
+ * <p>This method is called to create an aggregator for a {@link DoFn}. It receives the
+ * class of the {@link DoFn} being executed and the context of the step it is being
+ * executed in.
+ */
+ <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+ Class<?> fnClass, ExecutionContext.StepContext stepContext,
+ String aggregatorName, CombineFn<InputT, AccumT, OutputT> combine);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dd1498/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index e8f6247..13bf322 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -73,8 +73,10 @@ public interface Aggregator<InputT, OutputT> {
CombineFn<InputT, ?, OutputT> getCombineFn();
/**
- * A factory for creating aggregators.
+ * @deprecated this is for use only by runners and exists only for a migration period. Please
+ * use the identical interface in org.apache.beam.runners.core
*/
+ @Deprecated
interface AggregatorFactory {
/**
* Create an aggregator with the given {@code name} and {@link CombineFn}.
@@ -87,11 +89,4 @@ public interface Aggregator<InputT, OutputT> {
Class<?> fnClass, ExecutionContext.StepContext stepContext,
String aggregatorName, CombineFn<InputT, AccumT, OutputT> combine);
}
-
- // TODO: Consider the following additional API conveniences:
- // - In addition to createAggregator(), consider adding getAggregator() to
- // avoid the need to store the aggregator locally in a DoFn, i.e., create
- // if not already present.
- // - Add a shortcut for the most common aggregator:
- // c.createAggregator("name", new Sum.SumIntegerFn()).
}
[11/14] incubator-beam git commit: Fix flipped conditional in
SimpleDoFnRunner
Posted by ke...@apache.org.
Fix flipped conditional in SimpleDoFnRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f0c8d30d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f0c8d30d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f0c8d30d
Branch: refs/heads/master
Commit: f0c8d30d61c631fea642becde38b9cc52e873b5e
Parents: 043ebec
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 13:53:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0c8d30d/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 1cf56a6..dec9905 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -125,11 +125,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
@Override
public void processElement(WindowedValue<InputT> compressedElem) {
if (observesWindow) {
- invokeProcessElement(compressedElem);
- } else {
for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
invokeProcessElement(elem);
}
+ } else {
+ invokeProcessElement(compressedElem);
}
}
[10/14] incubator-beam git commit: Make DoFnTester aggregator
initialization idempotent
Posted by ke...@apache.org.
Make DoFnTester aggregator initialization idempotent
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/043ebeca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/043ebeca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/043ebeca
Branch: refs/heads/master
Commit: 043ebecacf7a8e96939b025afa8480c6df2f3b41
Parents: 2ab955d
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 13:35:29 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/DoFnTester.java | 18 +++++++++++++++++-
1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/043ebeca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 302bb02..7995719 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -543,6 +543,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
private <AinT, AccT, AoutT> Aggregator<AinT, AoutT> aggregator(
final String name,
final CombineFn<AinT, AccT, AoutT> combiner) {
+
Aggregator<AinT, AoutT> aggregator = new Aggregator<AinT, AoutT>() {
@Override
public void addValue(AinT value) {
@@ -561,7 +562,22 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
return combiner;
}
};
- accumulators.put(name, combiner.createAccumulator());
+
+ // Aggregator instantiation is idempotent
+ if (accumulators.containsKey(name)) {
+ Class<?> currentAccumClass = accumulators.get(name).getClass();
+ Class<?> createAccumClass = combiner.createAccumulator().getClass();
+ checkState(
+ currentAccumClass.isAssignableFrom(createAccumClass),
+ "Aggregator %s already initialized with accumulator type %s "
+ + "but was re-initialized with accumulator type %s",
+ name,
+ currentAccumClass,
+ createAccumClass);
+
+ } else {
+ accumulators.put(name, combiner.createAccumulator());
+ }
return aggregator;
}
[13/14] incubator-beam git commit: Make aggregator registration
idempotent in FlinkRunner
Posted by ke...@apache.org.
Make aggregator registration idempotent in FlinkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2089c5cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2089c5cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2089c5cd
Branch: refs/heads/master
Commit: 2089c5cd2662a2eeea39ac7ebd1bfd8bcdc1aa16
Parents: 1919d8b
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 23 21:26:48 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:26:48 2016 -0700
----------------------------------------------------------------------
.../flink/translation/functions/FlinkProcessContext.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2089c5cd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
index fa5eb1a..baf97cb 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Iterables;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -40,6 +41,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;
@@ -316,7 +318,13 @@ class FlinkProcessContext<InputT, OutputT>
createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
new SerializableFnAggregatorWrapper<>(combiner);
- runtimeContext.addAccumulator(name, wrapper);
+ Accumulator<?, ?> existingAccum =
+ (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name);
+ if (existingAccum != null) {
+ return wrapper;
+ } else {
+ runtimeContext.addAccumulator(name, wrapper);
+ }
return wrapper;
}
}
[02/14] incubator-beam git commit: Port some of ParDoTest to new DoFn
Posted by ke...@apache.org.
Port some of ParDoTest to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/30940179
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/30940179
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/30940179
Branch: refs/heads/master
Commit: 3094017956913b583a9bd8be5ce685683b591669
Parents: c2e751f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 12:35:03 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/transforms/DoFn.java | 7 ++++---
.../java/org/apache/beam/sdk/util/StringUtils.java | 2 +-
.../org/apache/beam/sdk/transforms/ParDoTest.java | 16 +++++++---------
3 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 11ca853..018877f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -216,9 +215,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
/**
- * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this context.
+ * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this
+ * context.
*
- * <p>This method should be called by runners before the {@link StartBundle @StartBundle} method.
+ * <p>This method should be called by runners before the {@link StartBundle @StartBundle}
+ * method.
*/
@Experimental(Kind.AGGREGATOR)
protected final void setupDelegateAggregators() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
index 4f81eef..1c52c1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
@@ -96,7 +96,7 @@ public class StringUtils {
}
private static final String[] STANDARD_NAME_SUFFIXES =
- new String[]{"OldDoFn", "Fn"};
+ new String[]{"OldDoFn", "DoFn", "Fn"};
/**
* Pattern to match a non-anonymous inner class.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30940179/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 52244a0..d3ea9fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -51,12 +51,12 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.ParDo.Bound;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -90,13 +90,11 @@ public class ParDoTest implements Serializable {
@Rule
public transient ExpectedException thrown = ExpectedException.none();
- private static class PrintingOldDoFn extends OldDoFn<String, String> implements
- RequiresWindowAccess {
-
- @Override
- public void processElement(ProcessContext c) {
+ private static class PrintingDoFn extends DoFn<String, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
c.output(c.element() + ":" + c.timestamp().getMillis()
- + ":" + c.window().maxTimestamp().getMillis());
+ + ":" + window.maxTimestamp().getMillis());
}
}
@@ -848,7 +846,7 @@ public class ParDoTest implements Serializable {
output5.getName());
}
- assertEquals("ParDo(Printing)", ParDo.of(new PrintingOldDoFn()).getName());
+ assertEquals("ParDo(Printing)", ParDo.of(new PrintingDoFn()).getName());
assertEquals(
"ParMultiDo(SideOutputDummy)",
@@ -1381,7 +1379,7 @@ public class ParDoTest implements Serializable {
System.out.println("Finish: 3");
}
}))
- .apply(ParDo.of(new PrintingOldDoFn()));
+ .apply(ParDo.of(new PrintingDoFn()));
PAssert.that(output).satisfies(new Checker());
[04/14] incubator-beam git commit: Port ReifyTimestampAndWindowsDoFn
to RequiresWindowAccess
Posted by ke...@apache.org.
Port ReifyTimestampAndWindowsDoFn to RequiresWindowAccess
This should become either a DoFn or probably more appropriately,
just something internal to runners that actually require it to be
manifested as a DoFn at all.
As an intermediate migration step, this lessens the level to which
it depends on unsupported APIs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2350417
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2350417
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2350417
Branch: refs/heads/master
Commit: b2350417f73ae6c34f849ff0e93d5bd93df3088d
Parents: 164ee56
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 23 19:45:41 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/util/ReifyTimestampAndWindowsDoFn.java | 16 +++++-----------
1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b2350417/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
index 8f3f540..6da4da0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.util;
import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.values.KV;
/**
@@ -28,20 +29,13 @@ import org.apache.beam.sdk.values.KV;
* @param <V> the type of the values of the input {@code PCollection}
*/
@SystemDoFnInternal
-public class ReifyTimestampAndWindowsDoFn<K, V>
- extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>> {
+public class ReifyTimestampAndWindowsDoFn<K, V> extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>>
+ implements RequiresWindowAccess {
@Override
- public void processElement(ProcessContext c)
- throws Exception {
+ public void processElement(ProcessContext c) throws Exception {
KV<K, V> kv = c.element();
K key = kv.getKey();
V value = kv.getValue();
- c.output(KV.of(
- key,
- WindowedValue.of(
- value,
- c.timestamp(),
- c.windowingInternals().windows(),
- c.pane())));
+ c.output(KV.of(key, WindowedValue.of(value, c.timestamp(), c.window(), c.pane())));
}
}