You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/23 06:52:11 UTC
[15/50] incubator-beam git commit: DirectRunner: Expand ParDo.Bound
into ParDo.BoundMulti
DirectRunner: Expand ParDo.Bound into ParDo.BoundMulti
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f86e98c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f86e98c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f86e98c9
Branch: refs/heads/gearpump-runner
Commit: f86e98c91cedbb1d9fd54e3268dfd8f014ac2f27
Parents: 34e2a35
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 16 15:43:47 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Nov 18 15:09:43 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 3 +-
.../runners/direct/ParDoEvaluatorFactory.java | 56 +--
.../direct/ParDoMultiEvaluatorHooks.java | 55 ---
.../direct/ParDoMultiOverrideFactory.java | 51 +++
.../runners/direct/ParDoOverrideFactory.java | 53 ---
.../direct/ParDoSingleEvaluatorHooks.java | 58 ---
.../ParDoSingleViaMultiOverrideFactory.java | 66 +++
.../direct/TransformEvaluatorRegistry.java | 7 +-
.../direct/ParDoMultiEvaluatorHooksTest.java | 439 -------------------
.../direct/ParDoSingleEvaluatorHooksTest.java | 335 --------------
10 files changed, 139 insertions(+), 984 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index cce73c3..0060e84 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -87,7 +87,8 @@ public class DirectRunner
.put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
.put(TestStream.class, new DirectTestStreamFactory())
.put(Write.Bound.class, new WriteWithShardingFactory())
- .put(ParDo.Bound.class, new ParDoOverrideFactory())
+ .put(ParDo.Bound.class, new ParDoSingleViaMultiOverrideFactory())
+ .put(ParDo.BoundMulti.class, new ParDoMultiOverrideFactory())
.put(
GBKIntoKeyedWorkItems.class,
new DirectGBKIntoKeyedWorkItemsOverrideFactory())
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index ee4987f..f126000 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -24,49 +24,22 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * A {@link TransformEvaluatorFactory} for {@link ParDo}-like primitive {@link PTransform
- * PTransforms}, parameterized by some {@link TransformHooks transform-specific handling}.
- */
-final class ParDoEvaluatorFactory<
- InputT,
- OutputT,
- TransformOutputT extends POutput,
- TransformT extends PTransform<PCollection<? extends InputT>, TransformOutputT>>
- implements TransformEvaluatorFactory {
- interface TransformHooks<
- InputT,
- OutputT,
- TransformOutputT extends POutput,
- TransformT extends PTransform<PCollection<? extends InputT>, TransformOutputT>> {
- /** Returns the {@link DoFn} contained in the given {@link ParDo} transform. */
- DoFn<InputT, OutputT> getDoFn(TransformT transform);
-
- /** Configures and creates a {@link ParDoEvaluator} for the given {@link DoFn}. */
- ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
- EvaluationContext evaluationContext,
- AppliedPTransform<PCollection<InputT>, TransformOutputT, TransformT> application,
- DirectStepContext stepContext,
- DoFn<InputT, OutputT> fnLocal);
- }
+/** A {@link TransformEvaluatorFactory} for {@link ParDo.BoundMulti}. */
+final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluatorFactory {
private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class);
private final LoadingCache<DoFn<?, ?>, DoFnLifecycleManager> fnClones;
private final EvaluationContext evaluationContext;
- private final TransformHooks<InputT, OutputT, TransformOutputT, TransformT> hooks;
- ParDoEvaluatorFactory(
- EvaluationContext evaluationContext,
- TransformHooks<InputT, OutputT, TransformOutputT, TransformT> hooks) {
+ ParDoEvaluatorFactory(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
- this.hooks = hooks;
fnClones =
CacheBuilder.newBuilder()
.build(
@@ -95,7 +68,8 @@ final class ParDoEvaluatorFactory<
@SuppressWarnings({"unchecked", "rawtypes"})
private TransformEvaluator<InputT> createEvaluator(
- AppliedPTransform<PCollection<InputT>, TransformOutputT, TransformT> application,
+ AppliedPTransform<PCollection<InputT>, PCollectionTuple, BoundMulti<InputT, OutputT>>
+ application,
CommittedBundle<InputT> inputBundle)
throws Exception {
String stepName = evaluationContext.getStepName(application);
@@ -104,12 +78,20 @@ final class ParDoEvaluatorFactory<
.getExecutionContext(application, inputBundle.getKey())
.getOrCreateStepContext(stepName, stepName);
- DoFnLifecycleManager fnManager =
- fnClones.getUnchecked(hooks.getDoFn(application.getTransform()));
+ DoFnLifecycleManager fnManager = fnClones.getUnchecked(application.getTransform().getNewFn());
try {
+ ParDo.BoundMulti<InputT, OutputT> transform = application.getTransform();
return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
- hooks.createParDoEvaluator(
- evaluationContext, application, stepContext, (DoFn<InputT, OutputT>) fnManager.get()),
+ ParDoEvaluator.create(
+ evaluationContext,
+ stepContext,
+ application,
+ application.getInput().getWindowingStrategy(),
+ fnManager.get(),
+ transform.getSideInputs(),
+ transform.getMainOutputTag(),
+ transform.getSideOutputTags().getAll(),
+ application.getOutput().getAll()),
fnManager);
} catch (Exception e) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
deleted file mode 100644
index f30f209..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-
-/** Support for {@link ParDo.BoundMulti} in {@link ParDoEvaluatorFactory}. */
-class ParDoMultiEvaluatorHooks<InputT, OutputT>
- implements ParDoEvaluatorFactory.TransformHooks<
- InputT, OutputT, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> {
- @Override
- public DoFn<InputT, OutputT> getDoFn(ParDo.BoundMulti<InputT, OutputT> transform) {
- return transform.getNewFn();
- }
-
- @Override
- public ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
- EvaluationContext evaluationContext,
- AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>>
- application,
- DirectStepContext stepContext,
- DoFn<InputT, OutputT> fnLocal) {
- ParDo.BoundMulti<InputT, OutputT> transform = application.getTransform();
- return ParDoEvaluator.create(
- evaluationContext,
- stepContext,
- application,
- application.getInput().getWindowingStrategy(),
- fnLocal,
- transform.getSideInputs(),
- transform.getMainOutputTag(),
- transform.getSideOutputTags().getAll(),
- application.getOutput().getAll());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
new file mode 100644
index 0000000..6cc3e6e
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+
+/**
+ * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo}
+ * in the direct runner. Currently overrides applications of <a
+ * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>.
+ */
+class ParDoMultiOverrideFactory<InputT, OutputT>
+ implements PTransformOverrideFactory<
+ PCollection<? extends InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public PTransform<PCollection<? extends InputT>, PCollectionTuple> override(
+ ParDo.BoundMulti<InputT, OutputT> transform) {
+
+ DoFn<InputT, OutputT> fn = transform.getNewFn();
+ DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+ if (!signature.processElement().isSplittable()) {
+ return transform;
+ } else {
+ return new SplittableParDo(fn);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
deleted file mode 100644
index 27941f8..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.core.SplittableParDo;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo}
- * in the direct runner. Currently overrides applications of <a
- * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>.
- */
-class ParDoOverrideFactory<InputT, OutputT>
- implements PTransformOverrideFactory<
- PCollection<? extends InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> {
- @Override
- @SuppressWarnings("unchecked")
- public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> override(
- ParDo.Bound<InputT, OutputT> transform) {
- ParDo.Bound<InputT, OutputT> that = (ParDo.Bound<InputT, OutputT>) transform;
- DoFn<InputT, OutputT> fn = DoFnAdapters.getDoFn(that.getFn());
- if (fn == null) {
- // This is an OldDoFn, hence not splittable.
- return transform;
- }
- DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
- if (!signature.processElement().isSplittable()) {
- return transform;
- }
- return new SplittableParDo(fn);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
deleted file mode 100644
index 6d284c2..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.Collections;
-import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-
-/** Support for {@link ParDo.Bound} in {@link ParDoEvaluatorFactory}. */
-class ParDoSingleEvaluatorHooks<InputT, OutputT>
- implements ParDoEvaluatorFactory.TransformHooks<
- InputT, OutputT, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> {
- @Override
- public DoFn<InputT, OutputT> getDoFn(ParDo.Bound<InputT, OutputT> transform) {
- return transform.getNewFn();
- }
-
- @Override
- public ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
- EvaluationContext evaluationContext,
- AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>>
- application,
- DirectStepContext stepContext,
- DoFn<InputT, OutputT> fnLocal) {
- TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
- ParDo.Bound<InputT, OutputT> transform = application.getTransform();
- return ParDoEvaluator.create(
- evaluationContext,
- stepContext,
- application,
- application.getInput().getWindowingStrategy(),
- fnLocal,
- transform.getSideInputs(),
- mainOutputTag,
- Collections.<TupleTag<?>>emptyList(),
- ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
new file mode 100644
index 0000000..ee3dfc5
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+/**
+ * A {@link PTransformOverrideFactory} that overrides single-output {@link ParDo} to implement
+ * it in terms of multi-output {@link ParDo}.
+ */
+class ParDoSingleViaMultiOverrideFactory<InputT, OutputT>
+ implements PTransformOverrideFactory<
+ PCollection<? extends InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> {
+ @Override
+ @SuppressWarnings("unchecked")
+ public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> override(
+ ParDo.Bound<InputT, OutputT> transform) {
+ return new ParDoSingleViaMulti(transform);
+ }
+
+ static class ParDoSingleViaMulti<InputT, OutputT>
+ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
+ private static final String MAIN_OUTPUT_TAG = "main";
+
+ private final ParDo.Bound<InputT, OutputT> underlyingParDo;
+
+ public ParDoSingleViaMulti(ParDo.Bound<InputT, OutputT> underlyingParDo) {
+ this.underlyingParDo = underlyingParDo;
+ }
+
+ @Override
+ public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+
+ // Output tags for ParDo need only be unique up to applied transform
+ TupleTag<OutputT> mainOutputTag = new TupleTag<OutputT>(MAIN_OUTPUT_TAG);
+
+ PCollectionTuple output =
+ input.apply(
+ ParDo.of(underlyingParDo.getNewFn())
+ .withSideInputs(underlyingParDo.getSideInputs())
+ .withOutputTags(mainOutputTag, TupleTagList.empty()));
+
+ return output.get(mainOutputTag);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 51502f7..0514c3a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -49,12 +49,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
.put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt))
.put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
- .put(
- ParDo.Bound.class,
- new ParDoEvaluatorFactory<>(ctxt, new ParDoSingleEvaluatorHooks<>()))
- .put(
- ParDo.BoundMulti.class,
- new ParDoEvaluatorFactory<>(ctxt, new ParDoMultiEvaluatorHooks<>()))
+ .put(ParDo.BoundMulti.class, new ParDoEvaluatorFactory<>(ctxt))
.put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt))
.put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt))
.put(Window.Bound.class, new WindowEvaluatorFactory(ctxt))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java
deleted file mode 100644
index 6302d37..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.Serializable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ParDoMultiEvaluatorHooks}.
- */
-@RunWith(JUnit4.class)
-public class ParDoMultiEvaluatorHooksTest implements Serializable {
- private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();
-
- @Test
- public void testParDoMultiInMemoryTransformEvaluator() throws Exception {
- TestPipeline p = TestPipeline.create();
-
- PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
- TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
- final TupleTag<String> elementTag = new TupleTag<>();
- final TupleTag<Integer> lengthTag = new TupleTag<>();
-
- BoundMulti<String, KV<String, Integer>> pardo =
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.<String, Integer>of(c.element(), c.element().length()));
- c.sideOutput(elementTag, c.element());
- c.sideOutput(lengthTag, c.element().length());
- }
- })
- .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag));
- PCollectionTuple outputTuple = input.apply(pardo);
-
- CommittedBundle<String> inputBundle =
- bundleFactory.createBundle(input).commit(Instant.now());
-
- PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
- PCollection<String> elementOutput = outputTuple.get(elementTag);
- PCollection<Integer> lengthOutput = outputTuple.get(lengthTag);
-
- EvaluationContext evaluationContext = mock(EvaluationContext.class);
- UncommittedBundle<KV<String, Integer>> mainOutputBundle =
- bundleFactory.createBundle(mainOutput);
- UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
- UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createBundle(lengthOutput);
-
- when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
- when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
- when(evaluationContext.createBundle(lengthOutput)).thenReturn(lengthOutputBundle);
-
- DirectExecutionContext executionContext =
- new DirectExecutionContext(null, null, null, null);
- when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
- inputBundle.getKey())).thenReturn(executionContext);
- AggregatorContainer container = AggregatorContainer.create();
- AggregatorContainer.Mutator mutator = container.createMutator();
- when(evaluationContext.getAggregatorContainer()).thenReturn(container);
- when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
- TransformEvaluator<String> evaluator =
- new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>())
- .forApplication(
- mainOutput.getProducingTransformInternal(), inputBundle);
-
- evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
- evaluator.processElement(
- WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
- evaluator.processElement(
- WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
- TransformResult result = evaluator.finishBundle();
- assertThat(
- result.getOutputBundles(),
- Matchers.<UncommittedBundle<?>>containsInAnyOrder(
- lengthOutputBundle, mainOutputBundle, elementOutputBundle));
- assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
- assertThat(result.getAggregatorChanges(), equalTo(mutator));
-
- assertThat(
- mainOutputBundle.commit(Instant.now()).getElements(),
- Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
- WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
- WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
- WindowedValue.valueInGlobalWindow(
- KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
- assertThat(
- elementOutputBundle.commit(Instant.now()).getElements(),
- Matchers.<WindowedValue<String>>containsInAnyOrder(
- WindowedValue.valueInGlobalWindow("foo"),
- WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
- WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
- assertThat(
- lengthOutputBundle.commit(Instant.now()).getElements(),
- Matchers.<WindowedValue<Integer>>containsInAnyOrder(
- WindowedValue.valueInGlobalWindow(3),
- WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
- WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
- }
-
- @Test
- public void testParDoMultiUndeclaredSideOutput() throws Exception {
- TestPipeline p = TestPipeline.create();
-
- PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
- TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
- final TupleTag<String> elementTag = new TupleTag<>();
- final TupleTag<Integer> lengthTag = new TupleTag<>();
-
- BoundMulti<String, KV<String, Integer>> pardo =
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.<String, Integer>of(c.element(), c.element().length()));
- c.sideOutput(elementTag, c.element());
- c.sideOutput(lengthTag, c.element().length());
- }
- })
- .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
- PCollectionTuple outputTuple = input.apply(pardo);
-
- CommittedBundle<String> inputBundle =
- bundleFactory.createBundle(input).commit(Instant.now());
-
- PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
- PCollection<String> elementOutput = outputTuple.get(elementTag);
-
- EvaluationContext evaluationContext = mock(EvaluationContext.class);
- UncommittedBundle<KV<String, Integer>> mainOutputBundle =
- bundleFactory.createBundle(mainOutput);
- UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
-
- when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
- when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
-
- DirectExecutionContext executionContext =
- new DirectExecutionContext(null, null, null, null);
- when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
- inputBundle.getKey())).thenReturn(executionContext);
- AggregatorContainer container = AggregatorContainer.create();
- AggregatorContainer.Mutator mutator = container.createMutator();
- when(evaluationContext.getAggregatorContainer()).thenReturn(container);
- when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
- TransformEvaluator<String> evaluator =
- new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>())
- .forApplication(
- mainOutput.getProducingTransformInternal(), inputBundle);
-
- evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
- evaluator.processElement(
- WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
- evaluator.processElement(
- WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
- TransformResult result = evaluator.finishBundle();
- assertThat(
- result.getOutputBundles(),
- Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
- assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
- assertThat(result.getAggregatorChanges(), equalTo(mutator));
-
- assertThat(
- mainOutputBundle.commit(Instant.now()).getElements(),
- Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
- WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
- WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
- WindowedValue.valueInGlobalWindow(
- KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
- assertThat(
- elementOutputBundle.commit(Instant.now()).getElements(),
- Matchers.<WindowedValue<String>>containsInAnyOrder(
- WindowedValue.valueInGlobalWindow("foo"),
- WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
- WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
- }
-
- /**
- * This test ignored, as today testing of GroupByKey is all the state that needs testing.
- * This should be ported to state when ready.
- */
- @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
- @Test
- public void finishBundleWithStatePutsStateInResult() throws Exception {
- TestPipeline p = TestPipeline.create();
-
- PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
- TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
- final TupleTag<String> elementTag = new TupleTag<>();
-
- final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
- StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow());
- final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
- final StateNamespace windowNs =
- StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
- BoundMulti<String, KV<String, Integer>> pardo =
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- private static final String STATE_ID = "my-state-id";
-
- @StateId(STATE_ID)
- private final StateSpec<Object, BagState<String>> bagSpec =
- StateSpecs.bag(StringUtf8Coder.of());
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) {
- bagState.add(c.element());
- }
- })
- .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
- PCollectionTuple outputTuple = input.apply(pardo);
-
- CommittedBundle<String> inputBundle =
- bundleFactory.createBundle(input).commit(Instant.now());
-
- PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
- PCollection<String> elementOutput = outputTuple.get(elementTag);
-
- EvaluationContext evaluationContext = mock(EvaluationContext.class);
- UncommittedBundle<KV<String, Integer>> mainOutputBundle =
- bundleFactory.createBundle(mainOutput);
- UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
-
- when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
- when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
-
- DirectExecutionContext executionContext = new DirectExecutionContext(null,
- StructuralKey.of("myKey", StringUtf8Coder.of()),
- null,
- null);
- when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
- inputBundle.getKey())).thenReturn(executionContext);
- AggregatorContainer container = AggregatorContainer.create();
- AggregatorContainer.Mutator mutator = container.createMutator();
- when(evaluationContext.getAggregatorContainer()).thenReturn(container);
- when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
- TransformEvaluator<String> evaluator =
- new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>())
- .forApplication(
- mainOutput.getProducingTransformInternal(), inputBundle);
-
- evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
- evaluator.processElement(
- WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
- evaluator.processElement(
- WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
- TransformResult result = evaluator.finishBundle();
- assertThat(
- result.getOutputBundles(),
- Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
- assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L)));
- assertThat(result.getState(), not(nullValue()));
- assertThat(
- result.getState().state(StateNamespaces.global(), watermarkTag).read(),
- equalTo(new Instant(20205L)));
- assertThat(
- result.getState().state(windowNs, bagTag).read(),
- containsInAnyOrder("foo", "bara", "bazam"));
- }
-
- /**
- * This test ignored, as today testing of GroupByKey is all the state that needs testing.
- * This should be ported to state when ready.
- */
- @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
- @Test
- public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
- TestPipeline p = TestPipeline.create();
-
- PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
- TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
- final TupleTag<String> elementTag = new TupleTag<>();
-
- final TimerData addedTimer =
- TimerData.of(
- StateNamespaces.window(
- IntervalWindow.getCoder(),
- new IntervalWindow(
- new Instant(0).plus(Duration.standardMinutes(5)),
- new Instant(1)
- .plus(Duration.standardMinutes(5))
- .plus(Duration.standardHours(1)))),
- new Instant(54541L),
- TimeDomain.EVENT_TIME);
- final TimerData deletedTimer =
- TimerData.of(
- StateNamespaces.window(
- IntervalWindow.getCoder(),
- new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))),
- new Instant(3400000),
- TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
- BoundMulti<String, KV<String, Integer>> pardo =
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- private static final String EVENT_TIME_TIMER = "event-time-timer";
- private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer";
-
- @TimerId(EVENT_TIME_TIMER)
- TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
- @TimerId(SYNC_PROC_TIME_TIMER)
- TimerSpec syncProcTimerSpec =
- TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
- @ProcessElement
- public void processElement(
- ProcessContext c,
- @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer,
- @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) {
-
- eventTimeTimer.setForNowPlus(Duration.standardMinutes(5));
- syncProcTimeTimer.cancel();
- }
- })
- .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
- PCollectionTuple outputTuple = input.apply(pardo);
-
- CommittedBundle<String> inputBundle =
- bundleFactory.createBundle(input).commit(Instant.now());
-
- PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
- PCollection<String> elementOutput = outputTuple.get(elementTag);
-
- EvaluationContext evaluationContext = mock(EvaluationContext.class);
- UncommittedBundle<KV<String, Integer>> mainOutputBundle =
- bundleFactory.createBundle(mainOutput);
- UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
-
- when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
- when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
-
- DirectExecutionContext executionContext = new DirectExecutionContext(null,
- StructuralKey.of("myKey", StringUtf8Coder.of()),
- null, null);
- when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
- inputBundle.getKey())).thenReturn(executionContext);
- AggregatorContainer container = AggregatorContainer.create();
- AggregatorContainer.Mutator mutator = container.createMutator();
- when(evaluationContext.getAggregatorContainer()).thenReturn(container);
- when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
- TransformEvaluator<String> evaluator =
- new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>())
- .forApplication(
- mainOutput.getProducingTransformInternal(), inputBundle);
-
- evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
- evaluator.processElement(
- WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
- evaluator.processElement(
- WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
- TransformResult result = evaluator.finishBundle();
- assertThat(
- result.getTimerUpdate(),
- equalTo(
- TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
- .setTimer(addedTimer)
- .setTimer(addedTimer)
- .setTimer(addedTimer)
- .deletedTimer(deletedTimer)
- .deletedTimer(deletedTimer)
- .deletedTimer(deletedTimer)
- .build()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java
deleted file mode 100644
index 10cd7c5..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.Serializable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ParDoSingleEvaluatorHooks}.
- */
-@RunWith(JUnit4.class)
-public class ParDoSingleEvaluatorHooksTest implements Serializable {
- private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();
-
- @Test
- public void testParDoInMemoryTransformEvaluator() throws Exception {
- TestPipeline p = TestPipeline.create();
-
- PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
- PCollection<Integer> collection =
- input.apply(
- ParDo.of(
- new DoFn<String, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().length());
- }
- }));
- CommittedBundle<String> inputBundle =
- bundleFactory.createBundle(input).commit(Instant.now());
-
- EvaluationContext evaluationContext = mock(EvaluationContext.class);
- UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(collection);
- when(evaluationContext.createBundle(collection)).thenReturn(outputBundle);
- DirectExecutionContext executionContext =
- new DirectExecutionContext(null, null, null, null);
- when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
- inputBundle.getKey())).thenReturn(executionContext);
- AggregatorContainer container = AggregatorContainer.create();
- AggregatorContainer.Mutator mutator = container.createMutator();
- when(evaluationContext.getAggregatorContainer()).thenReturn(container);
- when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
- org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
- new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>())
- .forApplication(
- collection.getProducingTransformInternal(), inputBundle);
-
- evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
- evaluator.processElement(
- WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
- evaluator.processElement(
- WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
- TransformResult result = evaluator.finishBundle();
- assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle));
- assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
- assertThat(result.getAggregatorChanges(), equalTo(mutator));
-
- assertThat(
- outputBundle.commit(Instant.now()).getElements(),
- Matchers.<WindowedValue<Integer>>containsInAnyOrder(
- WindowedValue.valueInGlobalWindow(3),
- WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
- WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
- }
-
- @Test
- public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception {
- TestPipeline p = TestPipeline.create();
-
- PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
- final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
- PCollection<Integer> collection =
- input.apply(
- ParDo.of(
- new DoFn<String, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.sideOutput(sideOutputTag, c.element().length());
- }
- }));
- CommittedBundle<String> inputBundle =
- bundleFactory.createBundle(input).commit(Instant.now());
-
- EvaluationContext evaluationContext = mock(EvaluationContext.class);
- UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(collection);
- when(evaluationContext.createBundle(collection)).thenReturn(outputBundle);
- DirectExecutionContext executionContext =
- new DirectExecutionContext(null, null, null, null);
- when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
- inputBundle.getKey())).thenReturn(executionContext);
- AggregatorContainer container = AggregatorContainer.create();
- AggregatorContainer.Mutator mutator = container.createMutator();
- when(evaluationContext.getAggregatorContainer()).thenReturn(container);
- when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
- TransformEvaluator<String> evaluator =
- new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>())
- .forApplication(
- collection.getProducingTransformInternal(), inputBundle);
-
- evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
- evaluator.processElement(
- WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
- evaluator.processElement(
- WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
- TransformResult result = evaluator.finishBundle();
- assertThat(
- result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle));
- assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
- assertThat(result.getAggregatorChanges(), equalTo(mutator));
- }
-
- /**
- * This test ignored, as today testing of GroupByKey is all the state that needs testing.
- * This should be ported to state when ready.
- */
- @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
- @Test
- public void finishBundleWithStatePutsStateInResult() throws Exception {
- TestPipeline p = TestPipeline.create();
-
- PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
- final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
- final StateNamespace windowNs =
- StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
- ParDo.Bound<String, KV<String, Integer>> pardo =
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- private static final String STATE_ID = "my-state-id";
-
- @StateId(STATE_ID)
- private final StateSpec<Object, BagState<String>> bagSpec =
- StateSpecs.bag(StringUtf8Coder.of());
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) {
- bagState.add(c.element());
- }
- });
- PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
-
- CommittedBundle<String> inputBundle =
- bundleFactory.createBundle(input).commit(Instant.now());
-
- EvaluationContext evaluationContext = mock(EvaluationContext.class);
- UncommittedBundle<KV<String, Integer>> mainOutputBundle =
- bundleFactory.createBundle(mainOutput);
-
- when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-
- DirectExecutionContext executionContext = new DirectExecutionContext(null,
- StructuralKey.of("myKey", StringUtf8Coder.of()),
- null, null);
- when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
- inputBundle.getKey()))
- .thenReturn(executionContext);
- AggregatorContainer container = AggregatorContainer.create();
- AggregatorContainer.Mutator mutator = container.createMutator();
- when(evaluationContext.getAggregatorContainer()).thenReturn(container);
- when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
- org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
- new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>())
- .forApplication(
- mainOutput.getProducingTransformInternal(), inputBundle);
-
- evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
- evaluator.processElement(
- WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
- evaluator.processElement(
- WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
- TransformResult result = evaluator.finishBundle();
- assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L)));
- assertThat(result.getState(), not(nullValue()));
- assertThat(
- result.getState().state(windowNs, bagTag).read(),
- containsInAnyOrder("foo", "bara", "bazam"));
- }
-
- /**
- * This test ignored, as today testing of GroupByKey is all the state that needs testing.
- * This should be ported to state when ready.
- */
- @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
- @Test
- public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
- TestPipeline p = TestPipeline.create();
-
- PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
- // TODO: this timer data is absolute, but the new API only support relative settings.
- // It will require adjustments when @Ignore is removed
- final TimerData addedTimer =
- TimerData.of(
- StateNamespaces.window(
- IntervalWindow.getCoder(),
- new IntervalWindow(
- new Instant(0).plus(Duration.standardMinutes(5)),
- new Instant(1)
- .plus(Duration.standardMinutes(5))
- .plus(Duration.standardHours(1)))),
- new Instant(54541L),
- TimeDomain.EVENT_TIME);
- final TimerData deletedTimer =
- TimerData.of(
- StateNamespaces.window(
- IntervalWindow.getCoder(),
- new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))),
- new Instant(3400000),
- TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
- ParDo.Bound<String, KV<String, Integer>> pardo =
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- private static final String EVENT_TIME_TIMER = "event-time-timer";
- private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer";
-
- @TimerId(EVENT_TIME_TIMER)
- TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
- @TimerId(SYNC_PROC_TIME_TIMER)
- TimerSpec syncProcTimerSpec =
- TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
- @ProcessElement
- public void processElement(
- ProcessContext c,
- @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer,
- @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) {
- eventTimeTimer.setForNowPlus(Duration.standardMinutes(5));
- syncProcTimeTimer.cancel();
- }
- });
- PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
-
- StructuralKey<?> key = StructuralKey.of("myKey", StringUtf8Coder.of());
- CommittedBundle<String> inputBundle =
- bundleFactory.createBundle(input).commit(Instant.now());
-
- EvaluationContext evaluationContext = mock(EvaluationContext.class);
- UncommittedBundle<KV<String, Integer>> mainOutputBundle =
- bundleFactory.createBundle(mainOutput);
-
- when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-
- DirectExecutionContext executionContext = new DirectExecutionContext(null,
- key,
- null,
- null);
- when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
- inputBundle.getKey()))
- .thenReturn(executionContext);
- AggregatorContainer container = AggregatorContainer.create();
- AggregatorContainer.Mutator mutator = container.createMutator();
- when(evaluationContext.getAggregatorContainer()).thenReturn(container);
- when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
- TransformEvaluator<String> evaluator =
- new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>())
- .forApplication(
- mainOutput.getProducingTransformInternal(), inputBundle);
-
- evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-
- TransformResult result = evaluator.finishBundle();
- assertThat(result.getTimerUpdate(),
- equalTo(TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
- .setTimer(addedTimer)
- .deletedTimer(deletedTimer)
- .build()));
- }
-}