You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/08 03:04:38 UTC
[5/6] incubator-beam git commit: BEAM-261 Make translators package
private.
BEAM-261 Make translators package private.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5553c603
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5553c603
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5553c603
Branch: refs/heads/apex-runner
Commit: 5553c603a0c48855d38d4702f19e905eac2034f2
Parents: 9197d1e
Author: Thomas Weise <th...@apache.org>
Authored: Thu Oct 27 16:19:15 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Mon Nov 7 22:33:46 2016 +0100
----------------------------------------------------------------------
.../runners/apex/ApexPipelineTranslator.java | 185 -------
.../apache/beam/runners/apex/ApexRunner.java | 13 +-
.../beam/runners/apex/ApexRunnerResult.java | 2 +-
.../translation/ApexPipelineTranslator.java | 179 +++++++
.../translation/CreateValuesTranslator.java | 48 ++
.../FlattenPCollectionTranslator.java | 129 +++++
.../apex/translation/GroupByKeyTranslator.java | 42 ++
.../translation/ParDoBoundMultiTranslator.java | 142 ++++++
.../apex/translation/ParDoBoundTranslator.java | 64 +++
.../translation/ReadUnboundedTranslator.java | 42 ++
.../apex/translation/TransformTranslator.java | 31 ++
.../apex/translation/TranslationContext.java | 178 +++++++
.../operators/ApexFlattenOperator.java | 125 +++++
.../operators/ApexGroupByKeyOperator.java | 478 +++++++++++++++++++
.../operators/ApexParDoOperator.java | 375 +++++++++++++++
.../ApexReadUnboundedInputOperator.java | 155 ++++++
.../translation/operators/package-info.java | 22 +
.../runners/apex/translation/package-info.java | 22 +
.../translation/utils/ApexStateInternals.java | 438 +++++++++++++++++
.../apex/translation/utils/ApexStreamTuple.java | 222 +++++++++
.../utils/CoderAdapterStreamCodec.java | 69 +++
.../apex/translation/utils/NoOpStepContext.java | 72 +++
.../utils/SerializablePipelineOptions.java | 60 +++
.../utils/ValueAndCoderKryoSerializable.java | 77 +++
.../apex/translation/utils/ValuesSource.java | 149 ++++++
.../apex/translation/utils/package-info.java | 22 +
.../translators/CreateValuesTranslator.java | 48 --
.../FlattenPCollectionTranslator.java | 129 -----
.../apex/translators/GroupByKeyTranslator.java | 42 --
.../translators/ParDoBoundMultiTranslator.java | 142 ------
.../apex/translators/ParDoBoundTranslator.java | 64 ---
.../translators/ReadUnboundedTranslator.java | 42 --
.../apex/translators/TransformTranslator.java | 31 --
.../apex/translators/TranslationContext.java | 178 -------
.../functions/ApexFlattenOperator.java | 125 -----
.../functions/ApexGroupByKeyOperator.java | 478 -------------------
.../functions/ApexParDoOperator.java | 375 ---------------
.../translators/functions/package-info.java | 22 -
.../io/ApexReadUnboundedInputOperator.java | 154 ------
.../apex/translators/io/ValuesSource.java | 149 ------
.../apex/translators/io/package-info.java | 22 -
.../runners/apex/translators/package-info.java | 22 -
.../translators/utils/ApexStateInternals.java | 438 -----------------
.../apex/translators/utils/ApexStreamTuple.java | 222 ---------
.../utils/CoderAdapterStreamCodec.java | 69 ---
.../apex/translators/utils/NoOpStepContext.java | 72 ---
.../utils/SerializablePipelineOptions.java | 60 ---
.../utils/ValueAndCoderKryoSerializable.java | 77 ---
.../apex/translators/utils/package-info.java | 22 -
.../translation/ApexGroupByKeyOperatorTest.java | 112 +++++
.../FlattenPCollectionTranslatorTest.java | 99 ++++
.../translation/GroupByKeyTranslatorTest.java | 246 ++++++++++
.../translation/ParDoBoundTranslatorTest.java | 340 +++++++++++++
.../translation/ReadUnboundTranslatorTest.java | 129 +++++
.../utils/ApexStateInternalsTest.java | 361 ++++++++++++++
.../translation/utils/CollectionSource.java | 136 ++++++
.../translation/utils/PipelineOptionsTest.java | 84 ++++
.../translators/ApexGroupByKeyOperatorTest.java | 112 -----
.../FlattenPCollectionTranslatorTest.java | 99 ----
.../translators/GroupByKeyTranslatorTest.java | 246 ----------
.../translators/ParDoBoundTranslatorTest.java | 340 -------------
.../translators/ReadUnboundTranslatorTest.java | 129 -----
.../utils/ApexStateInternalsTest.java | 361 --------------
.../translators/utils/CollectionSource.java | 136 ------
.../translators/utils/PipelineOptionsTest.java | 84 ----
65 files changed, 4653 insertions(+), 4685 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
deleted file mode 100644
index 8a87ce0..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex;
-
-import com.datatorrent.api.DAG;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
-import org.apache.beam.runners.apex.translators.CreateValuesTranslator;
-import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator;
-import org.apache.beam.runners.apex.translators.GroupByKeyTranslator;
-import org.apache.beam.runners.apex.translators.ParDoBoundMultiTranslator;
-import org.apache.beam.runners.apex.translators.ParDoBoundTranslator;
-import org.apache.beam.runners.apex.translators.ReadUnboundedTranslator;
-import org.apache.beam.runners.apex.translators.TransformTranslator;
-import org.apache.beam.runners.apex.translators.TranslationContext;
-import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link ApexPipelineTranslator} translates {@link Pipeline} objects
- * into Apex logical plan {@link DAG}.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
- private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class);
-
- /**
- * A map from {@link PTransform} subclass to the corresponding
- * {@link TransformTranslator} to use to translate that transform.
- */
- private static final Map<Class<? extends PTransform>, TransformTranslator>
- transformTranslators = new HashMap<>();
-
- private final TranslationContext translationContext;
-
- static {
- // register TransformTranslators
- registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
- registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator<>());
- registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
- registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
- registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
- registerTransformTranslator(Flatten.FlattenPCollectionList.class,
- new FlattenPCollectionTranslator());
- registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
- registerTransformTranslator(CreateApexPCollectionView.class,
- new CreateApexPCollectionViewTranslator());
- registerTransformTranslator(CreatePCollectionView.class,
- new CreatePCollectionViewTranslator());
- }
-
- public ApexPipelineTranslator(TranslationContext translationContext) {
- this.translationContext = translationContext;
- }
-
- public void translate(Pipeline pipeline) {
- pipeline.traverseTopologically(this);
- }
-
- @Override
- public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
- LOG.debug("entering composite transform {}", node.getTransform());
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- @Override
- public void leaveCompositeTransform(TransformTreeNode node) {
- LOG.debug("leaving composite transform {}", node.getTransform());
- }
-
- @Override
- public void visitPrimitiveTransform(TransformTreeNode node) {
- LOG.debug("visiting transform {}", node.getTransform());
- PTransform transform = node.getTransform();
- TransformTranslator translator = getTransformTranslator(transform.getClass());
- if (null == translator) {
- throw new UnsupportedOperationException(
- "no translator registered for " + transform);
- }
- translationContext.setCurrentTransform(node);
- translator.translate(transform, translationContext);
- }
-
- @Override
- public void visitValue(PValue value, TransformTreeNode producer) {
- LOG.debug("visiting value {}", value);
- }
-
- /**
- * Records that instances of the specified PTransform class
- * should be translated by default by the corresponding
- * {@link TransformTranslator}.
- */
- private static <TransformT extends PTransform> void registerTransformTranslator(
- Class<TransformT> transformClass,
- TransformTranslator<? extends TransformT> transformTranslator) {
- if (transformTranslators.put(transformClass, transformTranslator) != null) {
- throw new IllegalArgumentException(
- "defining multiple translators for " + transformClass);
- }
- }
-
- /**
- * Returns the {@link TransformTranslator} to use for instances of the
- * specified PTransform class, or null if none registered.
- */
- private <TransformT extends PTransform<?, ?>>
- TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
- return transformTranslators.get(transformClass);
- }
-
- private static class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void translate(Read.Bounded<T> transform, TranslationContext context) {
- // TODO: adapter is visibleForTesting
- BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(
- transform.getSource());
- ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
- unboundedSource, true, context.getPipelineOptions());
- context.addOperator(operator, operator.output);
- }
-
- }
-
- private static class CreateApexPCollectionViewTranslator<ElemT, ViewT>
- implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void translate(CreateApexPCollectionView<ElemT, ViewT> transform,
- TranslationContext context) {
- PCollectionView<ViewT> view = transform.getView();
- context.addView(view);
- LOG.debug("view {}", view.getName());
- }
- }
-
- private static class CreatePCollectionViewTranslator<ElemT, ViewT>
- implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void translate(CreatePCollectionView<ElemT, ViewT> transform,
- TranslationContext context) {
- PCollectionView<ViewT> view = transform.getView();
- context.addView(view);
- LOG.debug("view {}", view.getName());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 661308d..b42dddf 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -29,7 +29,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.runners.apex.translators.TranslationContext;
+import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
import org.apache.beam.runners.core.AssignWindows;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
@@ -118,17 +118,15 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
@Override
- public ApexRunnerResult run(Pipeline pipeline) {
+ public ApexRunnerResult run(final Pipeline pipeline) {
- final TranslationContext translationContext = new TranslationContext(options);
- ApexPipelineTranslator translator = new ApexPipelineTranslator(translationContext);
- translator.translate(pipeline);
+ final ApexPipelineTranslator translator = new ApexPipelineTranslator(options);
StreamingApplication apexApp = new StreamingApplication() {
@Override
public void populateDAG(DAG dag, Configuration conf) {
dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName());
- translationContext.populateDAG(dag);
+ translator.translate(pipeline, dag);
}
};
@@ -352,9 +350,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
/**
* Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
- *
- * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
- * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
* They require the input {@link PCollection} fits in memory.
* For a large {@link PCollection} this is expected to crash!
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 03428a6..3ae69f2 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -79,7 +79,7 @@ public class ApexRunnerResult implements PipelineResult {
/**
* Return the DAG executed by the pipeline.
- * @return
+ * @return DAG from translation.
*/
public DAG getApexDAG() {
return apexDAG;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
new file mode 100644
index 0000000..d38faf7
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import com.datatorrent.api.DAG;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
+import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ApexPipelineTranslator} translates {@link Pipeline} objects
+ * into Apex logical plan {@link DAG}.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
+ private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class);
+
+ /**
+ * A map from {@link PTransform} subclass to the corresponding
+ * {@link TransformTranslator} to use to translate that transform.
+ */
+ private static final Map<Class<? extends PTransform>, TransformTranslator>
+ transformTranslators = new HashMap<>();
+
+ private final TranslationContext translationContext;
+
+ static {
+ // register TransformTranslators
+ registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
+ registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator<>());
+ registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
+ registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
+ registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
+ registerTransformTranslator(Flatten.FlattenPCollectionList.class,
+ new FlattenPCollectionTranslator());
+ registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
+ registerTransformTranslator(CreateApexPCollectionView.class,
+ new CreateApexPCollectionViewTranslator());
+ registerTransformTranslator(CreatePCollectionView.class,
+ new CreatePCollectionViewTranslator());
+ }
+
+ public ApexPipelineTranslator(ApexPipelineOptions options) {
+ this.translationContext = new TranslationContext(options);
+ }
+
+ public void translate(Pipeline pipeline, DAG dag) {
+ pipeline.traverseTopologically(this);
+ translationContext.populateDAG(dag);
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+ LOG.debug("entering composite transform {}", node.getTransform());
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformTreeNode node) {
+ LOG.debug("leaving composite transform {}", node.getTransform());
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformTreeNode node) {
+ LOG.debug("visiting transform {}", node.getTransform());
+ PTransform transform = node.getTransform();
+ TransformTranslator translator = getTransformTranslator(transform.getClass());
+ if (null == translator) {
+ throw new UnsupportedOperationException(
+ "no translator registered for " + transform);
+ }
+ translationContext.setCurrentTransform(node);
+ translator.translate(transform, translationContext);
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformTreeNode producer) {
+ LOG.debug("visiting value {}", value);
+ }
+
+ /**
+ * Records that instances of the specified PTransform class
+ * should be translated by default by the corresponding
+ * {@link TransformTranslator}.
+ */
+ private static <TransformT extends PTransform> void registerTransformTranslator(
+ Class<TransformT> transformClass,
+ TransformTranslator<? extends TransformT> transformTranslator) {
+ if (transformTranslators.put(transformClass, transformTranslator) != null) {
+ throw new IllegalArgumentException(
+ "defining multiple translators for " + transformClass);
+ }
+ }
+
+ /**
+ * Returns the {@link TransformTranslator} to use for instances of the
+ * specified PTransform class, or null if none registered.
+ */
+ private <TransformT extends PTransform<?, ?>>
+ TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
+ return transformTranslators.get(transformClass);
+ }
+
+ private static class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(Read.Bounded<T> transform, TranslationContext context) {
+ // TODO: adapter is visibleForTesting
+ BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(
+ transform.getSource());
+ ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+ unboundedSource, true, context.getPipelineOptions());
+ context.addOperator(operator, operator.output);
+ }
+
+ }
+
+ private static class CreateApexPCollectionViewTranslator<ElemT, ViewT>
+ implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(CreateApexPCollectionView<ElemT, ViewT> transform,
+ TranslationContext context) {
+ PCollectionView<ViewT> view = transform.getView();
+ context.addView(view);
+ LOG.debug("view {}", view.getName());
+ }
+ }
+
+ private static class CreatePCollectionViewTranslator<ElemT, ViewT>
+ implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(CreatePCollectionView<ElemT, ViewT> transform,
+ TranslationContext context) {
+ PCollectionView<ViewT> view = transform.getView();
+ context.addView(view);
+ LOG.debug("view {}", view.getName());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
new file mode 100644
index 0000000..ceae2b5
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translation.utils.ValuesSource;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PBegin;
+
+
+/**
+ * Wraps elements from Create.Values into an {@link UnboundedSource}.
+ * mainly used for testing
+ */
+class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
+ private static final long serialVersionUID = 1451000241832745629L;
+
+ @Override
+ public void translate(Create.Values<T> transform, TranslationContext context) {
+ try {
+ UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(),
+ transform.getDefaultOutputCoder((PBegin) context.getInput()));
+ ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+ unboundedSource, context.getPipelineOptions());
+ context.addOperator(operator, operator.output);
+ } catch (CannotProvideCoderException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
new file mode 100644
index 0000000..eb24af9
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.apex.translation.operators.ApexFlattenOperator;
+import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translation.utils.ValuesSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+/**
+ * {@link Flatten.FlattenPCollectionList} translation to Apex operator.
+ */
+class FlattenPCollectionTranslator<T> implements
+ TransformTranslator<Flatten.FlattenPCollectionList<T>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
+ PCollectionList<T> input = context.getInput();
+ List<PCollection<T>> collections = input.getAll();
+
+ if (collections.isEmpty()) {
+ // create a dummy source that never emits anything
+ @SuppressWarnings("unchecked")
+ UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(Collections.EMPTY_LIST,
+ (Coder<T>) VoidCoder.of());
+ ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+ unboundedSource, context.getPipelineOptions());
+ context.addOperator(operator, operator.output);
+ } else {
+ PCollection<T> output = context.getOutput();
+ Map<PCollection<?>, Integer> unionTags = Collections.emptyMap();
+ flattenCollections(collections, unionTags, output, context);
+ }
+ }
+
+ /**
+ * Flatten the given collections into the given result collection. Translates
+ * into a cascading merge with 2 input ports per operator. The optional union
+ * tags can be used to identify the source in the result stream, used to
+ * channel multiple side inputs to a single Apex operator port.
+ *
+ * @param collections
+ * @param unionTags
+ * @param finalCollection
+ * @param context
+ */
+ static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>,
+ Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) {
+ List<PCollection<T>> remainingCollections = Lists.newArrayList();
+ PCollection<T> firstCollection = null;
+ while (!collections.isEmpty()) {
+ for (PCollection<T> collection : collections) {
+ if (null == firstCollection) {
+ firstCollection = collection;
+ } else {
+ ApexFlattenOperator<T> operator = new ApexFlattenOperator<>();
+ context.addStream(firstCollection, operator.data1);
+ Integer unionTag = unionTags.get(firstCollection);
+ operator.data1Tag = (unionTag != null) ? unionTag : 0;
+ context.addStream(collection, operator.data2);
+ unionTag = unionTags.get(collection);
+ operator.data2Tag = (unionTag != null) ? unionTag : 0;
+
+ if (!collection.getCoder().equals(firstCollection.getCoder())) {
+ throw new UnsupportedOperationException("coders don't match");
+ }
+
+ if (collections.size() > 2) {
+ PCollection<T> intermediateCollection = intermediateCollection(collection,
+ collection.getCoder());
+ context.addOperator(operator, operator.out, intermediateCollection);
+ remainingCollections.add(intermediateCollection);
+ } else {
+ // final stream merge
+ context.addOperator(operator, operator.out, finalCollection);
+ }
+ firstCollection = null;
+ }
+ }
+ if (firstCollection != null) {
+ // push to next merge level
+ remainingCollections.add(firstCollection);
+ firstCollection = null;
+ }
+ if (remainingCollections.size() > 1) {
+ collections = remainingCollections;
+ remainingCollections = Lists.newArrayList();
+ } else {
+ collections = Lists.newArrayList();
+ }
+ }
+ }
+
+ static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
+ PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+ input.getWindowingStrategy(), input.isBounded());
+ output.setCoder(outputCoder);
+ return output;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
new file mode 100644
index 0000000..47d447a
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import org.apache.beam.runners.apex.translation.operators.ApexGroupByKeyOperator;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@link GroupByKey} translation to Apex operator.
+ */
+class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(GroupByKey<K, V> transform, TranslationContext context) {
+ PCollection<KV<K, V>> input = context.getInput();
+ ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(),
+ input, context.<K>stateInternalsFactory()
+ );
+ context.addOperator(group, group.output);
+ context.addStream(input, group.input);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
new file mode 100644
index 0000000..7c91b91
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.OutputPort;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ParDo.BoundMulti} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}.
+ */
+class ParDoBoundMultiTranslator<InputT, OutputT>
+ implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslator.class);
+
+ @Override
+ public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
+ OldDoFn<InputT, OutputT> doFn = transform.getFn();
+ PCollectionTuple output = context.getOutput();
+ PCollection<InputT> input = context.getInput();
+ List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+ Coder<InputT> inputCoder = input.getCoder();
+ WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+ ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
+ context.getPipelineOptions(),
+ doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
+ context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder,
+ context.<Void>stateInternalsFactory()
+ );
+
+ Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
+ Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
+ for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
+ if (outputEntry.getKey() == transform.getMainOutputTag()) {
+ ports.put(outputEntry.getValue(), operator.output);
+ } else {
+ int portIndex = 0;
+ for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) {
+ if (tag == outputEntry.getKey()) {
+ ports.put(outputEntry.getValue(), operator.sideOutputPorts[portIndex]);
+ break;
+ }
+ portIndex++;
+ }
+ }
+ }
+ context.addOperator(operator, ports);
+ context.addStream(context.getInput(), operator.input);
+ if (!sideInputs.isEmpty()) {
+ addSideInputs(operator, sideInputs, context);
+ }
+ }
+
+ static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs,
+ TranslationContext context) {
+ Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
+ if (sideInputs.size() > sideInputPorts.length) {
+ PCollection<?> unionCollection = unionSideInputs(sideInputs, context);
+ context.addStream(unionCollection, sideInputPorts[0]);
+ } else {
+ // the number of ports for side inputs is fixed and each port can only take one input.
+ for (int i = 0; i < sideInputs.size(); i++) {
+ context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]);
+ }
+ }
+ }
+
+ private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs,
+ TranslationContext context) {
+ checkArgument(sideInputs.size() > 1, "requires multiple side inputs");
+ // flatten and assign union tag
+ List<PCollection<Object>> sourceCollections = new ArrayList<>();
+ Map<PCollection<?>, Integer> unionTags = new HashMap<>();
+ PCollection<Object> firstSideInput = context.getViewInput(sideInputs.get(0));
+ for (int i = 0; i < sideInputs.size(); i++) {
+ PCollectionView<?> sideInput = sideInputs.get(i);
+ PCollection<?> sideInputCollection = context.getViewInput(sideInput);
+ if (!sideInputCollection.getWindowingStrategy().equals(
+ firstSideInput.getWindowingStrategy())) {
+ // TODO: check how to handle this in stream codec
+ //String msg = "Multiple side inputs with different window strategies.";
+ //throw new UnsupportedOperationException(msg);
+ LOG.warn("Side inputs union with different windowing strategies {} {}",
+ firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy());
+ }
+ if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
+ String msg = "Multiple side inputs with different coders.";
+ throw new UnsupportedOperationException(msg);
+ }
+ sourceCollections.add(context.<PCollection<Object>>getViewInput(sideInput));
+ unionTags.put(sideInputCollection, i);
+ }
+
+ PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(
+ firstSideInput, firstSideInput.getCoder());
+ FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection,
+ context);
+ return resultCollection;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
new file mode 100644
index 0000000..c1ebbd5
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import java.util.List;
+
+import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+/**
+ * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}.
+ */
+class ParDoBoundTranslator<InputT, OutputT> implements
+ TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
+ OldDoFn<InputT, OutputT> doFn = transform.getFn();
+ PCollection<OutputT> output = context.getOutput();
+ PCollection<InputT> input = context.getInput();
+ List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+ Coder<InputT> inputCoder = input.getCoder();
+ WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+ ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
+ context.getPipelineOptions(),
+ doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
+ output.getWindowingStrategy(), sideInputs, wvInputCoder,
+ context.<Void>stateInternalsFactory()
+ );
+ context.addOperator(operator, operator.output);
+ context.addStream(context.getInput(), operator.input);
+ if (!sideInputs.isEmpty()) {
+ ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java
new file mode 100644
index 0000000..b3034ac
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import com.datatorrent.api.InputOperator;
+
+import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+/**
+ * {@link Read.Unbounded} is translated to Apex {@link InputOperator}
+ * that wraps {@link UnboundedSource}.
+ */
+class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(Read.Unbounded<T> transform, TranslationContext context) {
+ UnboundedSource<T, ?> unboundedSource = transform.getSource();
+ ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+ unboundedSource, context.getPipelineOptions());
+ context.addOperator(operator, operator.output);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java
new file mode 100644
index 0000000..eb81052
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+
+import java.io.Serializable;
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Translates {@link PTransform} to Apex functions.
+ */
+interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable {
+ void translate(T transform, TranslationContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
new file mode 100644
index 0000000..e016730
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Operator.OutputPort;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Maintains context data for {@link TransformTranslator}s.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+class TranslationContext {
+
+ private final ApexPipelineOptions pipelineOptions;
+ private AppliedPTransform<?, ?, ?> currentTransform;
+ private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>();
+ private final Map<String, Operator> operators = new HashMap<>();
+ private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>();
+
+ public void addView(PCollectionView<?> view) {
+ this.viewInputs.put(view, this.getInput());
+ }
+
+ public <InputT extends PInput> InputT getViewInput(PCollectionView<?> view) {
+ PInput input = this.viewInputs.get(view);
+ checkArgument(input != null, "unknown view " + view.getName());
+ return (InputT) input;
+ }
+
+ TranslationContext(ApexPipelineOptions pipelineOptions) {
+ this.pipelineOptions = pipelineOptions;
+ }
+
+ public void setCurrentTransform(TransformTreeNode treeNode) {
+ this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
+ treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform());
+ }
+
+ public ApexPipelineOptions getPipelineOptions() {
+ return pipelineOptions;
+ }
+
+ public <InputT extends PInput> InputT getInput() {
+ return (InputT) getCurrentTransform().getInput();
+ }
+
+ public <OutputT extends POutput> OutputT getOutput() {
+ return (OutputT) getCurrentTransform().getOutput();
+ }
+
+ private AppliedPTransform<?, ?, ?> getCurrentTransform() {
+ checkArgument(currentTransform != null, "current transform not set");
+ return currentTransform;
+ }
+
+ public void addOperator(Operator operator, OutputPort port) {
+ addOperator(operator, port, this.<PCollection<?>>getOutput());
+ }
+
+ /**
+ * Register operator and output ports for the given collections.
+ * @param operator
+ * @param ports
+ */
+ public void addOperator(Operator operator, Map<PCollection<?>, OutputPort<?>> ports) {
+ boolean first = true;
+ for (Map.Entry<PCollection<?>, OutputPort<?>> portEntry : ports.entrySet()) {
+ if (first) {
+ addOperator(operator, portEntry.getValue(), portEntry.getKey());
+ first = false;
+ } else {
+ this.streams.put(portEntry.getKey(), (Pair) new ImmutablePair<>(portEntry.getValue(),
+ new ArrayList<>()));
+ }
+ }
+ }
+
+ /**
+ * Add the operator with its output port for the given result {link PCollection}.
+ * @param operator
+ * @param port
+ * @param output
+ */
+ public void addOperator(Operator operator, OutputPort port, PCollection output) {
+ // Apex DAG requires a unique operator name
+ // use the transform's name and make it unique
+ String name = getCurrentTransform().getFullName();
+ for (int i = 1; this.operators.containsKey(name); i++) {
+ name = getCurrentTransform().getFullName() + i;
+ }
+ this.operators.put(name, operator);
+ this.streams.put(output, (Pair) new ImmutablePair<>(port, new ArrayList<>()));
+ }
+
+ public void addStream(PInput input, InputPort inputPort) {
+ Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input);
+ checkArgument(stream != null, "no upstream operator defined for %s", input);
+ stream.getRight().add(inputPort);
+ }
+
+ public void populateDAG(DAG dag) {
+ for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) {
+ dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue());
+ }
+ int streamIndex = 0;
+ for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this.
+ streams.entrySet()) {
+ List<InputPort<?>> sinksList = streamEntry.getValue().getRight();
+ InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]);
+ if (sinks.length > 0) {
+ dag.addStream("stream" + streamIndex++, streamEntry.getValue().getLeft(), sinks);
+ for (InputPort port : sinks) {
+ PCollection pc = streamEntry.getKey();
+ Coder coder = pc.getCoder();
+ if (pc.getWindowingStrategy() != null) {
+ coder = FullWindowedValueCoder.of(pc.getCoder(),
+ pc.getWindowingStrategy().getWindowFn().windowCoder()
+ );
+ }
+ Coder<Object> wrapperCoder = ApexStreamTuple.ApexStreamTupleCoder.of(coder);
+ CoderAdapterStreamCodec streamCodec = new CoderAdapterStreamCodec(wrapperCoder);
+ dag.setInputPortAttribute(port, PortContext.STREAM_CODEC, streamCodec);
+ }
+ }
+ }
+ }
+
+ /**
+ * Return the {@link StateInternalsFactory} for the pipeline translation.
+ * @return
+ */
+ public <K> StateInternalsFactory<K> stateInternalsFactory() {
+ return new ApexStateInternals.ApexStateInternalsFactory();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
new file mode 100644
index 0000000..3d9db51
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.WatermarkTuple;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apex operator for Beam {@link org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList}.
+ */
+public class ApexFlattenOperator<InputT> extends BaseOperator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class);
+ private boolean traceTuples = false;
+
+ private long inputWM1;
+ private long inputWM2;
+ private long outputWM;
+
+ public int data1Tag;
+ public int data2Tag;
+
+ /**
+ * Data input port 1.
+ */
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 =
+ new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
+ /**
+ * Emits to port "out"
+ */
+ @Override
+ public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
+ if (tuple instanceof WatermarkTuple) {
+ WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple;
+ if (wmTuple.getTimestamp() > inputWM1) {
+ inputWM1 = wmTuple.getTimestamp();
+ if (inputWM1 <= inputWM2) {
+ // move output watermark and emit it
+ outputWM = inputWM1;
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", outputWM);
+ }
+ out.emit(tuple);
+ }
+ }
+ return;
+ }
+ if (traceTuples) {
+ LOG.debug("\nemitting {}\n", tuple);
+ }
+
+ if (data1Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
+ ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data1Tag);
+ }
+ out.emit(tuple);
+ }
+ };
+
+ /**
+ * Data input port 2.
+ */
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 =
+ new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
+ /**
+ * Emits to port "out"
+ */
+ @Override
+ public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
+ if (tuple instanceof WatermarkTuple) {
+ WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple;
+ if (wmTuple.getTimestamp() > inputWM2) {
+ inputWM2 = wmTuple.getTimestamp();
+ if (inputWM2 <= inputWM1) {
+ // move output watermark and emit it
+ outputWM = inputWM2;
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", outputWM);
+ }
+ out.emit(tuple);
+ }
+ }
+ return;
+ }
+ if (traceTuples) {
+ LOG.debug("\nemitting {}\n", tuple);
+ }
+
+ if (data2Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
+ ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data2Tag);
+ }
+ out.emit(tuple);
+ }
+ };
+
+ /**
+ * Output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out =
+ new DefaultOutputPort<>();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
new file mode 100644
index 0000000..1b5e693
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apex operator for Beam {@link GroupByKey}.
+ * This operator expects the input stream already partitioned by K,
+ * which is determined by the {@link StreamCodec} on the input port.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class ApexGroupByKeyOperator<K, V> implements Operator {
+ private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class);
+ private boolean traceTuples = true;
+
+ @Bind(JavaSerializer.class)
+ private WindowingStrategy<V, BoundedWindow> windowingStrategy;
+ @Bind(JavaSerializer.class)
+ private Coder<K> keyCoder;
+ @Bind(JavaSerializer.class)
+ private Coder<V> valueCoder;
+
+ @Bind(JavaSerializer.class)
+ private final SerializablePipelineOptions serializedOptions;
+ @Bind(JavaSerializer.class)
+ private final StateInternalsFactory<K> stateInternalsFactory;
+ private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>();
+ private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+
+ private transient ProcessContext context;
+ private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
+ private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
+ private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input =
+ new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() {
+ @Override
+ public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t) {
+ try {
+ if (t instanceof ApexStreamTuple.WatermarkTuple) {
+ ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>) t;
+ processWatermark(mark);
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", mark.getTimestamp());
+ }
+ output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(
+ mark.getTimestamp()));
+ return;
+ }
+ if (traceTuples) {
+ LOG.debug("\ninput {}\n", t.getValue());
+ }
+ processElement(t.getValue());
+ } catch (Exception e) {
+ Throwables.propagateIfPossible(e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>>
+ output = new DefaultOutputPort<>();
+
+ @SuppressWarnings("unchecked")
+ public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input,
+ StateInternalsFactory<K> stateInternalsFactory) {
+ checkNotNull(pipelineOptions);
+ this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
+ this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>) input.getWindowingStrategy();
+ this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
+ this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
+ this.stateInternalsFactory = stateInternalsFactory;
+ }
+
+ @SuppressWarnings("unused") // for Kryo
+ private ApexGroupByKeyOperator() {
+ this.serializedOptions = null;
+ this.stateInternalsFactory = null;
+ }
+
+ @Override
+ public void beginWindow(long l) {
+ }
+
+ @Override
+ public void endWindow() {
+ }
+
+ @Override
+ public void setup(OperatorContext context) {
+ this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
+ StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory();
+ this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy,
+ stateInternalsFactory, SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
+ this.context = new ProcessContext(fn, this.timerInternals);
+ }
+
+ @Override
+ public void teardown() {
+ }
+
+ /**
+ * Returns the list of timers that are ready to fire. These are the timers
+ * that are registered to be triggered at a time before the current watermark.
+ * We keep these timers in a Set, so that they are deduplicated, as the same
+ * timer can be registered multiple times.
+ */
+ private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(
+ long currentWatermark) {
+
+ // we keep the timers to return in a different list and launch them later
+ // because we cannot prevent a trigger from registering another trigger,
+ // which would lead to concurrent modification exception.
+ Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create();
+
+ Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it =
+ activeTimers.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+
+ Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
+ while (timerIt.hasNext()) {
+ TimerInternals.TimerData timerData = timerIt.next();
+ if (timerData.getTimestamp().isBefore(currentWatermark)) {
+ toFire.put(keyWithTimers.getKey(), timerData);
+ timerIt.remove();
+ }
+ }
+
+ if (keyWithTimers.getValue().isEmpty()) {
+ it.remove();
+ }
+ }
+ return toFire;
+ }
+
+ private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception {
+ final KV<K, V> kv = windowedValue.getValue();
+ final WindowedValue<V> updatedWindowedValue = WindowedValue.of(kv.getValue(),
+ windowedValue.getTimestamp(),
+ windowedValue.getWindows(),
+ windowedValue.getPane());
+
+ KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
+ kv.getKey(),
+ Collections.singletonList(updatedWindowedValue));
+
+ context.setElement(kwi, getStateInternalsForKey(kwi.key()));
+ fn.processElement(context);
+ }
+
+ private StateInternals<K> getStateInternalsForKey(K key) {
+ final ByteBuffer keyBytes;
+ try {
+ keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes);
+ if (stateInternals == null) {
+ stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+ perKeyStateInternals.put(keyBytes, stateInternals);
+ }
+ return stateInternals;
+ }
+
+ private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
+ final ByteBuffer keyBytes;
+ try {
+ keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
+ if (timersForKey == null) {
+ timersForKey = new HashSet<>();
+ }
+ timersForKey.add(timer);
+ activeTimers.put(keyBytes, timersForKey);
+ }
+
+ private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
+ final ByteBuffer keyBytes;
+ try {
+ keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
+ if (timersForKey != null) {
+ timersForKey.remove(timer);
+ if (timersForKey.isEmpty()) {
+ activeTimers.remove(keyBytes);
+ } else {
+ activeTimers.put(keyBytes, timersForKey);
+ }
+ }
+ }
+
+ private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
+ this.inputWatermark = new Instant(mark.getTimestamp());
+ Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(
+ mark.getTimestamp());
+ if (!timers.isEmpty()) {
+ for (ByteBuffer keyBytes : timers.keySet()) {
+ K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
+ KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes));
+ context.setElement(kwi, getStateInternalsForKey(kwi.key()));
+ fn.processElement(context);
+ }
+ }
+ }
+
+ private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?,
+ KeyedWorkItem<K, V>>.ProcessContext {
+
+ private final ApexTimerInternals timerInternals;
+ private StateInternals<K> stateInternals;
+ private KeyedWorkItem<K, V> element;
+
+ public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function,
+ ApexTimerInternals timerInternals) {
+ function.super();
+ this.timerInternals = checkNotNull(timerInternals);
+ }
+
+ public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) {
+ this.element = element;
+ this.stateInternals = stateForKey;
+ }
+
+ @Override
+ public KeyedWorkItem<K, V> element() {
+ return this.element;
+ }
+
+ @Override
+ public Instant timestamp() {
+ throw new UnsupportedOperationException(
+ "timestamp() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return serializedOptions.get();
+ }
+
+ @Override
+ public void output(KV<K, Iterable<V>> output) {
+ throw new UnsupportedOperationException(
+ "output() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public void outputWithTimestamp(KV<K, Iterable<V>> output, Instant timestamp) {
+ throw new UnsupportedOperationException(
+ "outputWithTimestamp() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public PaneInfo pane() {
+ throw new UnsupportedOperationException(
+ "pane() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new UnsupportedOperationException(
+ "window() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> windowingInternals() {
+ return new WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>() {
+
+ @Override
+ public StateInternals<K> stateInternals() {
+ return stateInternals;
+ }
+
+ @Override
+ public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp,
+ Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ if (traceTuples) {
+ LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
+ }
+ ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(
+ WindowedValue.of(output, timestamp, windows, pane)));
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return timerInternals;
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
+ }
+
+ @Override
+ public PaneInfo pane() {
+ throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
+ }
+
+ @Override
+ public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
+ Coder<T> elemCoder) throws IOException {
+ throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ throw new RuntimeException("sideInput() is not available in Streaming mode.");
+ }
+ };
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ throw new RuntimeException("sideInput() is not supported in Streaming mode.");
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ // ignore the side output, this can happen when a user does not register
+ // side outputs but then outputs using a freshly created TupleTag.
+ throw new RuntimeException("sideOutput() is not available when grouping by window.");
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ sideOutput(tag, output);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * An implementation of Beam's {@link TimerInternals}.
+ *
+ */
+ public class ApexTimerInternals implements TimerInternals {
+
+ @Override
+ public void setTimer(TimerData timerKey) {
+ registerActiveTimer(context.element().key(), timerKey);
+ }
+
+ @Override
+ public void deleteTimer(TimerData timerKey) {
+ unregisterActiveTimer(context.element().key(), timerKey);
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return Instant.now();
+ }
+
+ @Override
+ public Instant currentSynchronizedProcessingTime() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return inputWatermark;
+ }
+
+ @Override
+ public Instant currentOutputWatermarkTime() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setTimer(StateNamespace namespace, String timerId, Instant target,
+ TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
+ }
+
+ }
+
+ private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public StateInternals<K> stateInternalsForKey(K key) {
+ return getStateInternalsForKey(key);
+ }
+ }
+}