You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/18 23:09:56 UTC

[1/6] incubator-beam git commit: This closes #1378

Repository: incubator-beam
Updated Branches:
  refs/heads/master 3548ffb06 -> d93e9a88b


This closes #1378


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d93e9a88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d93e9a88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d93e9a88

Branch: refs/heads/master
Commit: d93e9a88b0f0b3c7dc2461cbd11147b9ad8327cf
Parents: 3548ffb f86e98c
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Nov 18 15:09:43 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Nov 18 15:09:43 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       |   3 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |  56 +--
 .../direct/ParDoMultiEvaluatorHooks.java        |  55 ---
 .../direct/ParDoMultiOverrideFactory.java       |  51 +++
 .../runners/direct/ParDoOverrideFactory.java    |  53 ---
 .../direct/ParDoSingleEvaluatorHooks.java       |  58 ---
 .../ParDoSingleViaMultiOverrideFactory.java     |  66 +++
 .../direct/TransformEvaluatorRegistry.java      |   7 +-
 .../direct/ParDoMultiEvaluatorHooksTest.java    | 439 -------------------
 .../direct/ParDoSingleEvaluatorHooksTest.java   | 335 --------------
 .../org/apache/beam/sdk/transforms/DoFn.java    |   7 +-
 .../beam/sdk/transforms/DoFnAdapters.java       |   6 +
 .../org/apache/beam/sdk/transforms/OldDoFn.java |   7 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   2 +-
 .../beam/sdk/transforms/WithTimestamps.java     |   5 +-
 .../apache/beam/sdk/transforms/OldDoFnTest.java |  11 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  55 ++-
 17 files changed, 194 insertions(+), 1022 deletions(-)
----------------------------------------------------------------------



[5/6] incubator-beam git commit: DirectRunner: Expand ParDo.Bound into ParDo.BoundMulti

Posted by ke...@apache.org.
DirectRunner: Expand ParDo.Bound into ParDo.BoundMulti


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f86e98c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f86e98c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f86e98c9

Branch: refs/heads/master
Commit: f86e98c91cedbb1d9fd54e3268dfd8f014ac2f27
Parents: 34e2a35
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 16 15:43:47 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Nov 18 15:09:43 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       |   3 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |  56 +--
 .../direct/ParDoMultiEvaluatorHooks.java        |  55 ---
 .../direct/ParDoMultiOverrideFactory.java       |  51 +++
 .../runners/direct/ParDoOverrideFactory.java    |  53 ---
 .../direct/ParDoSingleEvaluatorHooks.java       |  58 ---
 .../ParDoSingleViaMultiOverrideFactory.java     |  66 +++
 .../direct/TransformEvaluatorRegistry.java      |   7 +-
 .../direct/ParDoMultiEvaluatorHooksTest.java    | 439 -------------------
 .../direct/ParDoSingleEvaluatorHooksTest.java   | 335 --------------
 10 files changed, 139 insertions(+), 984 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index cce73c3..0060e84 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -87,7 +87,8 @@ public class DirectRunner
               .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
               .put(TestStream.class, new DirectTestStreamFactory())
               .put(Write.Bound.class, new WriteWithShardingFactory())
-              .put(ParDo.Bound.class, new ParDoOverrideFactory())
+              .put(ParDo.Bound.class, new ParDoSingleViaMultiOverrideFactory())
+              .put(ParDo.BoundMulti.class, new ParDoMultiOverrideFactory())
               .put(
                   GBKIntoKeyedWorkItems.class,
                   new DirectGBKIntoKeyedWorkItemsOverrideFactory())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index ee4987f..f126000 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -24,49 +24,22 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * A {@link TransformEvaluatorFactory} for {@link ParDo}-like primitive {@link PTransform
- * PTransforms}, parameterized by some {@link TransformHooks transform-specific handling}.
- */
-final class ParDoEvaluatorFactory<
-        InputT,
-        OutputT,
-        TransformOutputT extends POutput,
-        TransformT extends PTransform<PCollection<? extends InputT>, TransformOutputT>>
-    implements TransformEvaluatorFactory {
-  interface TransformHooks<
-      InputT,
-      OutputT,
-      TransformOutputT extends POutput,
-      TransformT extends PTransform<PCollection<? extends InputT>, TransformOutputT>> {
-    /** Returns the {@link DoFn} contained in the given {@link ParDo} transform. */
-    DoFn<InputT, OutputT> getDoFn(TransformT transform);
-
-    /** Configures and creates a {@link ParDoEvaluator} for the given {@link DoFn}. */
-    ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
-        EvaluationContext evaluationContext,
-        AppliedPTransform<PCollection<InputT>, TransformOutputT, TransformT> application,
-        DirectStepContext stepContext,
-        DoFn<InputT, OutputT> fnLocal);
-  }
+/** A {@link TransformEvaluatorFactory} for {@link ParDo.BoundMulti}. */
+final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluatorFactory {
 
   private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class);
   private final LoadingCache<DoFn<?, ?>, DoFnLifecycleManager> fnClones;
   private final EvaluationContext evaluationContext;
-  private final TransformHooks<InputT, OutputT, TransformOutputT, TransformT> hooks;
 
-  ParDoEvaluatorFactory(
-      EvaluationContext evaluationContext,
-      TransformHooks<InputT, OutputT, TransformOutputT, TransformT> hooks) {
+  ParDoEvaluatorFactory(EvaluationContext evaluationContext) {
     this.evaluationContext = evaluationContext;
-    this.hooks = hooks;
     fnClones =
         CacheBuilder.newBuilder()
             .build(
@@ -95,7 +68,8 @@ final class ParDoEvaluatorFactory<
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   private TransformEvaluator<InputT> createEvaluator(
-      AppliedPTransform<PCollection<InputT>, TransformOutputT, TransformT> application,
+      AppliedPTransform<PCollection<InputT>, PCollectionTuple, BoundMulti<InputT, OutputT>>
+          application,
       CommittedBundle<InputT> inputBundle)
       throws Exception {
     String stepName = evaluationContext.getStepName(application);
@@ -104,12 +78,20 @@ final class ParDoEvaluatorFactory<
             .getExecutionContext(application, inputBundle.getKey())
             .getOrCreateStepContext(stepName, stepName);
 
-    DoFnLifecycleManager fnManager =
-        fnClones.getUnchecked(hooks.getDoFn(application.getTransform()));
+    DoFnLifecycleManager fnManager = fnClones.getUnchecked(application.getTransform().getNewFn());
     try {
+      ParDo.BoundMulti<InputT, OutputT> transform = application.getTransform();
       return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
-          hooks.createParDoEvaluator(
-              evaluationContext, application, stepContext, (DoFn<InputT, OutputT>) fnManager.get()),
+          ParDoEvaluator.create(
+              evaluationContext,
+              stepContext,
+              application,
+              application.getInput().getWindowingStrategy(),
+              fnManager.get(),
+              transform.getSideInputs(),
+              transform.getMainOutputTag(),
+              transform.getSideOutputTags().getAll(),
+              application.getOutput().getAll()),
           fnManager);
     } catch (Exception e) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
deleted file mode 100644
index f30f209..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-
-/** Support for {@link ParDo.BoundMulti} in {@link ParDoEvaluatorFactory}. */
-class ParDoMultiEvaluatorHooks<InputT, OutputT>
-    implements ParDoEvaluatorFactory.TransformHooks<
-        InputT, OutputT, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> {
-  @Override
-  public DoFn<InputT, OutputT> getDoFn(ParDo.BoundMulti<InputT, OutputT> transform) {
-    return transform.getNewFn();
-  }
-
-  @Override
-  public ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
-      EvaluationContext evaluationContext,
-      AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>>
-          application,
-      DirectStepContext stepContext,
-      DoFn<InputT, OutputT> fnLocal) {
-    ParDo.BoundMulti<InputT, OutputT> transform = application.getTransform();
-    return ParDoEvaluator.create(
-        evaluationContext,
-        stepContext,
-        application,
-        application.getInput().getWindowingStrategy(),
-        fnLocal,
-        transform.getSideInputs(),
-        transform.getMainOutputTag(),
-        transform.getSideOutputTags().getAll(),
-        application.getOutput().getAll());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
new file mode 100644
index 0000000..6cc3e6e
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+
+/**
+ * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo}
+ * in the direct runner. Currently overrides applications of <a
+ * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>.
+ */
+class ParDoMultiOverrideFactory<InputT, OutputT>
+    implements PTransformOverrideFactory<
+        PCollection<? extends InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> {
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public PTransform<PCollection<? extends InputT>, PCollectionTuple> override(
+      ParDo.BoundMulti<InputT, OutputT> transform) {
+
+    DoFn<InputT, OutputT> fn = transform.getNewFn();
+    DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+    if (!signature.processElement().isSplittable()) {
+      return transform;
+    } else {
+      return new SplittableParDo(fn);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
deleted file mode 100644
index 27941f8..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.core.SplittableParDo;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo}
- * in the direct runner. Currently overrides applications of <a
- * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>.
- */
-class ParDoOverrideFactory<InputT, OutputT>
-    implements PTransformOverrideFactory<
-        PCollection<? extends InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> {
-  @Override
-  @SuppressWarnings("unchecked")
-  public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> override(
-      ParDo.Bound<InputT, OutputT> transform) {
-    ParDo.Bound<InputT, OutputT> that = (ParDo.Bound<InputT, OutputT>) transform;
-    DoFn<InputT, OutputT> fn = DoFnAdapters.getDoFn(that.getFn());
-    if (fn == null) {
-      // This is an OldDoFn, hence not splittable.
-      return transform;
-    }
-    DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
-    if (!signature.processElement().isSplittable()) {
-      return transform;
-    }
-    return new SplittableParDo(fn);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
deleted file mode 100644
index 6d284c2..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.Collections;
-import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-
-/** Support for {@link ParDo.Bound} in {@link ParDoEvaluatorFactory}. */
-class ParDoSingleEvaluatorHooks<InputT, OutputT>
-    implements ParDoEvaluatorFactory.TransformHooks<
-        InputT, OutputT, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> {
-  @Override
-  public DoFn<InputT, OutputT> getDoFn(ParDo.Bound<InputT, OutputT> transform) {
-    return transform.getNewFn();
-  }
-
-  @Override
-  public ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
-      EvaluationContext evaluationContext,
-      AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>>
-          application,
-      DirectStepContext stepContext,
-      DoFn<InputT, OutputT> fnLocal) {
-    TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
-    ParDo.Bound<InputT, OutputT> transform = application.getTransform();
-    return ParDoEvaluator.create(
-        evaluationContext,
-        stepContext,
-        application,
-        application.getInput().getWindowingStrategy(),
-        fnLocal,
-        transform.getSideInputs(),
-        mainOutputTag,
-        Collections.<TupleTag<?>>emptyList(),
-        ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
new file mode 100644
index 0000000..ee3dfc5
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+/**
+ * A {@link PTransformOverrideFactory} that overrides single-output {@link ParDo} to implement
+ * it in terms of multi-output {@link ParDo}.
+ */
+class ParDoSingleViaMultiOverrideFactory<InputT, OutputT>
+    implements PTransformOverrideFactory<
+        PCollection<? extends InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> {
+  @Override
+  @SuppressWarnings("unchecked")
+  public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> override(
+      ParDo.Bound<InputT, OutputT> transform) {
+    return new ParDoSingleViaMulti(transform);
+  }
+
+  static class ParDoSingleViaMulti<InputT, OutputT>
+      extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
+    private static final String MAIN_OUTPUT_TAG = "main";
+
+    private final ParDo.Bound<InputT, OutputT> underlyingParDo;
+
+    public ParDoSingleViaMulti(ParDo.Bound<InputT, OutputT> underlyingParDo) {
+      this.underlyingParDo = underlyingParDo;
+    }
+
+    @Override
+    public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+
+      // Output tags for ParDo need only be unique up to applied transform
+      TupleTag<OutputT> mainOutputTag = new TupleTag<OutputT>(MAIN_OUTPUT_TAG);
+
+      PCollectionTuple output =
+          input.apply(
+              ParDo.of(underlyingParDo.getNewFn())
+                  .withSideInputs(underlyingParDo.getSideInputs())
+                  .withOutputTags(mainOutputTag, TupleTagList.empty()));
+
+      return output.get(mainOutputTag);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 51502f7..0514c3a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -49,12 +49,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
         ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
             .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt))
             .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
-            .put(
-                ParDo.Bound.class,
-                new ParDoEvaluatorFactory<>(ctxt, new ParDoSingleEvaluatorHooks<>()))
-            .put(
-                ParDo.BoundMulti.class,
-                new ParDoEvaluatorFactory<>(ctxt, new ParDoMultiEvaluatorHooks<>()))
+            .put(ParDo.BoundMulti.class, new ParDoEvaluatorFactory<>(ctxt))
             .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt))
             .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt))
             .put(Window.Bound.class, new WindowEvaluatorFactory(ctxt))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java
deleted file mode 100644
index 6302d37..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.Serializable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ParDoMultiEvaluatorHooks}.
- */
-@RunWith(JUnit4.class)
-public class ParDoMultiEvaluatorHooksTest implements Serializable {
-  private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();
-
-  @Test
-  public void testParDoMultiInMemoryTransformEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
-    final TupleTag<String> elementTag = new TupleTag<>();
-    final TupleTag<Integer> lengthTag = new TupleTag<>();
-
-    BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(KV.<String, Integer>of(c.element(), c.element().length()));
-                    c.sideOutput(elementTag, c.element());
-                    c.sideOutput(lengthTag, c.element().length());
-                  }
-                })
-            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag));
-    PCollectionTuple outputTuple = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
-    PCollection<String> elementOutput = outputTuple.get(elementTag);
-    PCollection<Integer> lengthOutput = outputTuple.get(lengthTag);
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
-    UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createBundle(lengthOutput);
-
-    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
-    when(evaluationContext.createBundle(lengthOutput)).thenReturn(lengthOutputBundle);
-
-    DirectExecutionContext executionContext =
-        new DirectExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
-        inputBundle.getKey())).thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>())
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getOutputBundles(),
-        Matchers.<UncommittedBundle<?>>containsInAnyOrder(
-            lengthOutputBundle, mainOutputBundle, elementOutputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getAggregatorChanges(), equalTo(mutator));
-
-    assertThat(
-        mainOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
-            WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
-            WindowedValue.valueInGlobalWindow(
-                KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-    assertThat(
-        elementOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<String>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow("foo"),
-            WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
-            WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-    assertThat(
-        lengthOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<Integer>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(3),
-            WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
-            WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-  }
-
-  @Test
-  public void testParDoMultiUndeclaredSideOutput() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
-    final TupleTag<String> elementTag = new TupleTag<>();
-    final TupleTag<Integer> lengthTag = new TupleTag<>();
-
-    BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(KV.<String, Integer>of(c.element(), c.element().length()));
-                    c.sideOutput(elementTag, c.element());
-                    c.sideOutput(lengthTag, c.element().length());
-                  }
-                })
-            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
-    PCollectionTuple outputTuple = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
-    PCollection<String> elementOutput = outputTuple.get(elementTag);
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
-
-    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
-
-    DirectExecutionContext executionContext =
-        new DirectExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
-        inputBundle.getKey())).thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>())
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getOutputBundles(),
-        Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getAggregatorChanges(), equalTo(mutator));
-
-    assertThat(
-        mainOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
-            WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
-            WindowedValue.valueInGlobalWindow(
-                KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-    assertThat(
-        elementOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<String>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow("foo"),
-            WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
-            WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-  }
-
-  /**
-   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
-   * This should be ported to state when ready.
-   */
-  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
-  @Test
-  public void finishBundleWithStatePutsStateInResult() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
-    final TupleTag<String> elementTag = new TupleTag<>();
-
-    final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
-        StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow());
-    final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
-    final StateNamespace windowNs =
-        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
-    BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  private static final String STATE_ID = "my-state-id";
-
-                  @StateId(STATE_ID)
-                  private final StateSpec<Object, BagState<String>> bagSpec =
-                      StateSpecs.bag(StringUtf8Coder.of());
-
-                  @ProcessElement
-                  public void processElement(
-                      ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) {
-                    bagState.add(c.element());
-                  }
-                })
-            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
-    PCollectionTuple outputTuple = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
-    PCollection<String> elementOutput = outputTuple.get(elementTag);
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
-
-    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
-
-    DirectExecutionContext executionContext = new DirectExecutionContext(null,
-        StructuralKey.of("myKey", StringUtf8Coder.of()),
-        null,
-        null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
-        inputBundle.getKey())).thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>())
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getOutputBundles(),
-        Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L)));
-    assertThat(result.getState(), not(nullValue()));
-    assertThat(
-        result.getState().state(StateNamespaces.global(), watermarkTag).read(),
-        equalTo(new Instant(20205L)));
-    assertThat(
-        result.getState().state(windowNs, bagTag).read(),
-        containsInAnyOrder("foo", "bara", "bazam"));
-  }
-
-  /**
-   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
-   * This should be ported to state when ready.
-   */
-  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
-  @Test
-  public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
-    final TupleTag<String> elementTag = new TupleTag<>();
-
-    final TimerData addedTimer =
-        TimerData.of(
-            StateNamespaces.window(
-                IntervalWindow.getCoder(),
-                new IntervalWindow(
-                    new Instant(0).plus(Duration.standardMinutes(5)),
-                    new Instant(1)
-                        .plus(Duration.standardMinutes(5))
-                        .plus(Duration.standardHours(1)))),
-            new Instant(54541L),
-            TimeDomain.EVENT_TIME);
-    final TimerData deletedTimer =
-        TimerData.of(
-            StateNamespaces.window(
-                IntervalWindow.getCoder(),
-                new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))),
-            new Instant(3400000),
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
-    BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  private static final String EVENT_TIME_TIMER = "event-time-timer";
-                  private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer";
-
-                  @TimerId(EVENT_TIME_TIMER)
-                  TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
-                  @TimerId(SYNC_PROC_TIME_TIMER)
-                  TimerSpec syncProcTimerSpec =
-                      TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
-                  @ProcessElement
-                  public void processElement(
-                      ProcessContext c,
-                      @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer,
-                      @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) {
-
-                    eventTimeTimer.setForNowPlus(Duration.standardMinutes(5));
-                    syncProcTimeTimer.cancel();
-                  }
-                })
-            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
-    PCollectionTuple outputTuple = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
-    PCollection<String> elementOutput = outputTuple.get(elementTag);
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
-
-    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
-
-    DirectExecutionContext executionContext = new DirectExecutionContext(null,
-        StructuralKey.of("myKey", StringUtf8Coder.of()),
-        null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
-        inputBundle.getKey())).thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>())
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getTimerUpdate(),
-        equalTo(
-            TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
-                .setTimer(addedTimer)
-                .setTimer(addedTimer)
-                .setTimer(addedTimer)
-                .deletedTimer(deletedTimer)
-                .deletedTimer(deletedTimer)
-                .deletedTimer(deletedTimer)
-                .build()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java
deleted file mode 100644
index 10cd7c5..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.Serializable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ParDoSingleEvaluatorHooks}.
- */
-@RunWith(JUnit4.class)
-public class ParDoSingleEvaluatorHooksTest implements Serializable {
-  private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();
-
-  @Test
-  public void testParDoInMemoryTransformEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-    PCollection<Integer> collection =
-        input.apply(
-            ParDo.of(
-                new DoFn<String, Integer>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(c.element().length());
-                  }
-                }));
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(collection);
-    when(evaluationContext.createBundle(collection)).thenReturn(outputBundle);
-    DirectExecutionContext executionContext =
-        new DirectExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
-        inputBundle.getKey())).thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
-        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>())
-            .forApplication(
-                collection.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getAggregatorChanges(), equalTo(mutator));
-
-    assertThat(
-        outputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<Integer>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(3),
-            WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
-            WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-  }
-
-  @Test
-  public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-    final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
-    PCollection<Integer> collection =
-        input.apply(
-            ParDo.of(
-                new DoFn<String, Integer>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.sideOutput(sideOutputTag, c.element().length());
-                  }
-                }));
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(collection);
-    when(evaluationContext.createBundle(collection)).thenReturn(outputBundle);
-    DirectExecutionContext executionContext =
-        new DirectExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
-        inputBundle.getKey())).thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>())
-            .forApplication(
-                collection.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getAggregatorChanges(), equalTo(mutator));
-  }
-
-  /**
-   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
-   * This should be ported to state when ready.
-   */
-  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
-  @Test
-  public void finishBundleWithStatePutsStateInResult() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
-    final StateNamespace windowNs =
-        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
-    ParDo.Bound<String, KV<String, Integer>> pardo =
-        ParDo.of(
-            new DoFn<String, KV<String, Integer>>() {
-              private static final String STATE_ID = "my-state-id";
-
-              @StateId(STATE_ID)
-              private final StateSpec<Object, BagState<String>> bagSpec =
-                  StateSpecs.bag(StringUtf8Coder.of());
-
-              @ProcessElement
-              public void processElement(
-                  ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) {
-                bagState.add(c.element());
-              }
-            });
-    PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createBundle(mainOutput);
-
-    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-
-    DirectExecutionContext executionContext = new DirectExecutionContext(null,
-        StructuralKey.of("myKey", StringUtf8Coder.of()),
-        null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
-        inputBundle.getKey()))
-        .thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
-        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>())
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L)));
-    assertThat(result.getState(), not(nullValue()));
-    assertThat(
-        result.getState().state(windowNs, bagTag).read(),
-        containsInAnyOrder("foo", "bara", "bazam"));
-  }
-
-  /**
-   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
-   * This should be ported to state when ready.
-   */
-  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
-  @Test
-  public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    // TODO: this timer data is absolute, but the new API only support relative settings.
-    // It will require adjustments when @Ignore is removed
-    final TimerData addedTimer =
-        TimerData.of(
-            StateNamespaces.window(
-                IntervalWindow.getCoder(),
-                new IntervalWindow(
-                    new Instant(0).plus(Duration.standardMinutes(5)),
-                    new Instant(1)
-                        .plus(Duration.standardMinutes(5))
-                        .plus(Duration.standardHours(1)))),
-            new Instant(54541L),
-            TimeDomain.EVENT_TIME);
-    final TimerData deletedTimer =
-        TimerData.of(
-            StateNamespaces.window(
-                IntervalWindow.getCoder(),
-                new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))),
-            new Instant(3400000),
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
-    ParDo.Bound<String, KV<String, Integer>> pardo =
-        ParDo.of(
-            new DoFn<String, KV<String, Integer>>() {
-              private static final String EVENT_TIME_TIMER = "event-time-timer";
-              private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer";
-
-              @TimerId(EVENT_TIME_TIMER)
-              TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
-              @TimerId(SYNC_PROC_TIME_TIMER)
-              TimerSpec syncProcTimerSpec =
-                  TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
-              @ProcessElement
-              public void processElement(
-                  ProcessContext c,
-                  @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer,
-                  @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) {
-                eventTimeTimer.setForNowPlus(Duration.standardMinutes(5));
-                syncProcTimeTimer.cancel();
-              }
-            });
-    PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
-
-    StructuralKey<?> key = StructuralKey.of("myKey", StringUtf8Coder.of());
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createBundle(mainOutput);
-
-    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-
-    DirectExecutionContext executionContext = new DirectExecutionContext(null,
-        key,
-        null,
-        null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
-        inputBundle.getKey()))
-        .thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>())
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(result.getTimerUpdate(),
-        equalTo(TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
-            .setTimer(addedTimer)
-            .deletedTimer(deletedTimer)
-            .build()));
-  }
-}


[4/6] incubator-beam git commit: Do not override type descriptor in WithTimestamps

Posted by ke...@apache.org.
Do not override type descriptor in WithTimestamps


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6fa8057f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6fa8057f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6fa8057f

Branch: refs/heads/master
Commit: 6fa8057fe1db97615872915dcd81b2bca9b44e63
Parents: e92a157
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 17 11:07:32 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Nov 18 15:09:43 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/WithTimestamps.java     | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6fa8057f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 00ac8e4..64e7c45 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -99,9 +99,8 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
 
   @Override
   public PCollection<T> apply(PCollection<T> input) {
-    return input
-        .apply("AddTimestamps", ParDo.of(new AddTimestampsDoFn<T>(fn, allowedTimestampSkew)))
-        .setTypeDescriptorInternal(input.getTypeDescriptor());
+    return input.apply(
+        "AddTimestamps", ParDo.of(new AddTimestampsDoFn<T>(fn, allowedTimestampSkew)));
   }
 
   private static class AddTimestampsDoFn<T> extends DoFn<T, T> {


[3/6] incubator-beam git commit: Separate ParDoTest cases and make them more flexible

Posted by ke...@apache.org.
Separate ParDoTest cases and make them more flexible

A number of excessively rigid name tests preclude runner expansion
of ParDo. This change makes them into independent unit tests
for better signal and makes them more accurate to the intent - the
name should have the relevant information, but may have other
content.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e92a157a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e92a157a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e92a157a

Branch: refs/heads/master
Commit: e92a157aa41746df225379b71cc88695c8e2d93f
Parents: 3548ffb
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 17 10:14:07 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Nov 18 15:09:43 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ParDoTest.java   | 55 ++++++++++----------
 1 file changed, 27 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e92a157a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 26f5570..3c3e266 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -27,6 +27,7 @@ import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
@@ -826,39 +827,37 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  public void testParDoGetName() {
+  public void testParDoOutputNameBasedOnDoFnWithTrimmedSuffix() {
     Pipeline p = TestPipeline.create();
+    PCollection<String> output = p.apply(Create.of(1)).apply(ParDo.of(new TestOldDoFn()));
+    assertThat(output.getName(), containsString("ParDo(Test)"));
+  }
 
-    PCollection<Integer> input =
-        p.apply(Create.of(Arrays.asList(3, -42, 666)))
-        .setName("MyInput");
-
-    {
-      PCollection<String> output1 = input.apply(ParDo.of(new TestOldDoFn()));
-      assertEquals("ParDo(Test).out", output1.getName());
-    }
-
-    {
-      PCollection<String> output2 = input.apply("MyParDo", ParDo.of(new TestOldDoFn()));
-      assertEquals("MyParDo.out", output2.getName());
-    }
-
-    {
-      PCollection<String> output4 = input.apply("TestOldDoFn", ParDo.of(new TestOldDoFn()));
-      assertEquals("TestOldDoFn.out", output4.getName());
-    }
+  @Test
+  public void testParDoOutputNameBasedOnLabel() {
+    Pipeline p = TestPipeline.create();
+    PCollection<String> output =
+        p.apply(Create.of(1)).apply("MyParDo", ParDo.of(new TestOldDoFn()));
+    assertThat(output.getName(), containsString("MyParDo"));
+  }
 
-    {
-      PCollection<String> output5 = input.apply(ParDo.of(new StrangelyNamedDoer()));
-      assertEquals("ParDo(StrangelyNamedDoer).out",
-          output5.getName());
-    }
+  @Test
+  public void testParDoOutputNameBasedDoFnWithoutMatchingSuffix() {
+    Pipeline p = TestPipeline.create();
+    PCollection<String> output = p.apply(Create.of(1)).apply(ParDo.of(new StrangelyNamedDoer()));
+    assertThat(output.getName(), containsString("ParDo(StrangelyNamedDoer)"));
+  }
 
-    assertEquals("ParDo(Printing)", ParDo.of(new PrintingDoFn()).getName());
+  @Test
+  public void testParDoTransformNameBasedDoFnWithTrimmedSuffix() {
+    assertThat(ParDo.of(new PrintingDoFn()).getName(), containsString("ParDo(Printing)"));
+  }
 
-    assertEquals(
-        "ParMultiDo(SideOutputDummy)",
-        ParDo.of(new SideOutputDummyFn(null)).withOutputTags(null, null).getName());
+  @Test
+  public void testParDoMultiNameBasedDoFnWithTrimmerSuffix() {
+    assertThat(
+        ParDo.of(new SideOutputDummyFn(null)).withOutputTags(null, null).getName(),
+        containsString("ParMultiDo(SideOutputDummy)"));
   }
 
   @Test


[2/6] incubator-beam git commit: Use getNewFn for coder inferences in ParDo

Posted by ke...@apache.org.
Use getNewFn for coder inferences in ParDo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34e2a352
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34e2a352
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34e2a352

Branch: refs/heads/master
Commit: 34e2a3525643f98ddcee5889b2d0d232e2e05509
Parents: b0d46c2
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 17 11:08:48 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Nov 18 15:09:43 2016 -0800

----------------------------------------------------------------------
 .../core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34e2a352/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index ac1bccb..215ae6a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -1094,7 +1094,7 @@ public class ParDo {
       Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
       return input.getPipeline().getCoderRegistry().getDefaultCoder(
           output.getTypeDescriptor(),
-          getOldFn().getInputTypeDescriptor(),
+          getNewFn().getInputTypeDescriptor(),
           inputCoder);
       }
 


[6/6] incubator-beam git commit: Delegate getAggregators() in various DoFn adapters

Posted by ke...@apache.org.
Delegate getAggregators() in various DoFn adapters


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0d46c2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0d46c2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0d46c2d

Branch: refs/heads/master
Commit: b0d46c2deb4318f8d0e55eeeb20e1d11ceadd218
Parents: 6fa8057
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 17 15:50:17 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Nov 18 15:09:43 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/transforms/DoFn.java   |  7 ++++++-
 .../org/apache/beam/sdk/transforms/DoFnAdapters.java     |  6 ++++++
 .../java/org/apache/beam/sdk/transforms/OldDoFn.java     |  7 ++++++-
 .../java/org/apache/beam/sdk/transforms/OldDoFnTest.java | 11 +++++++----
 4 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 9978ef4..221d942 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -28,6 +28,8 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -236,7 +238,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
 
       aggregator.setDelegate(delegate);
     }
-
   }
 
   /**
@@ -298,6 +299,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
 
   protected Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
 
+  Collection<Aggregator<?, ?>> getAggregators() {
+    return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
+  }
+
   /**
    * Protects aggregators from being created after initialization.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index a3466bb..1a74ae7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import java.io.IOException;
+import java.util.Collection;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
@@ -244,6 +245,11 @@ public class DoFnAdapters {
     }
 
     @Override
+    Collection<Aggregator<?, ?>> getAggregators() {
+      return fn.getAggregators();
+    }
+
+    @Override
     public Duration getAllowedTimestampSkew() {
       return fn.getAllowedTimestampSkew();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index f16e0b3..9bf9003 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -676,6 +676,11 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
     }
 
     @Override
+    Collection<Aggregator<?, ?>> getAggregators() {
+      return OldDoFn.this.getAggregators();
+    }
+
+    @Override
     protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
       return OldDoFn.this.getOutputTypeDescriptor();
     }
@@ -683,7 +688,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
 
   /**
    * A {@link ProcessContext} for an {@link OldDoFn} that implements
-   * {@link OldDoFn.RequiresWindowAcccess}, via a context for a proper {@link DoFn}.
+   * {@link RequiresWindowAccess}, via a context for a proper {@link DoFn}.
    */
   private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d46c2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
index e7ae135..07e3078 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
 import java.util.Map;
 import org.apache.beam.sdk.AggregatorValues;
@@ -37,6 +36,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
 import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -216,14 +216,17 @@ public class OldDoFnTest implements Serializable {
     Pipeline pipeline = TestPipeline.create();
 
     CountOddsFn countOdds = new CountOddsFn();
-    pipeline
+    PCollection<Void> output = pipeline
         .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
         .apply(ParDo.of(countOdds));
     PipelineResult result = pipeline.run();
 
     AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator);
-    assertThat(values.getValuesAtSteps(),
-        equalTo((Map<String, Integer>) ImmutableMap.<String, Integer>of("ParDo(CountOdds)", 4)));
+
+    Map<String, Integer> valuesMap = values.getValuesAtSteps();
+
+    assertThat(valuesMap.size(), equalTo(1));
+    assertThat(valuesMap.get(output.getProducingTransformInternal().getFullName()), equalTo(4));
   }
 
   private static class CountOddsFn extends OldDoFn<Integer, Void> {