You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/08 03:04:37 UTC
[4/6] incubator-beam git commit: BEAM-261 Make translators package
private.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
new file mode 100644
index 0000000..44e7b11
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
+import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apex operator for Beam {@link DoFn}.
+ */
+public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager {
+ private static final Logger LOG = LoggerFactory.getLogger(ApexParDoOperator.class);
+ private boolean traceTuples = true;
+
+ @Bind(JavaSerializer.class)
+ private final SerializablePipelineOptions pipelineOptions;
+ @Bind(JavaSerializer.class)
+ private final OldDoFn<InputT, OutputT> doFn;
+ @Bind(JavaSerializer.class)
+ private final TupleTag<OutputT> mainOutputTag;
+ @Bind(JavaSerializer.class)
+ private final List<TupleTag<?>> sideOutputTags;
+ @Bind(JavaSerializer.class)
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ @Bind(JavaSerializer.class)
+ private final List<PCollectionView<?>> sideInputs;
+
+ private final StateInternals<Void> sideInputStateInternals;
+ private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
+ private LongMin pushedBackWatermark = new LongMin();
+ private long currentInputWatermark = Long.MIN_VALUE;
+ private long currentOutputWatermark = currentInputWatermark;
+
+ private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
+ private transient SideInputHandler sideInputHandler;
+ private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping =
+ Maps.newHashMapWithExpectedSize(5);
+
+ public ApexParDoOperator(
+ ApexPipelineOptions pipelineOptions,
+ OldDoFn<InputT, OutputT> doFn,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ WindowingStrategy<?, ?> windowingStrategy,
+ List<PCollectionView<?>> sideInputs,
+ Coder<WindowedValue<InputT>> inputCoder,
+ StateInternalsFactory<Void> stateInternalsFactory
+ ) {
+ this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
+ this.doFn = doFn;
+ this.mainOutputTag = mainOutputTag;
+ this.sideOutputTags = sideOutputTags;
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputs = sideInputs;
+ this.sideInputStateInternals = stateInternalsFactory.stateInternalsForKey(null);
+
+ if (sideOutputTags.size() > sideOutputPorts.length) {
+ String msg = String.format("Too many side outputs (currently only supporting %s).",
+ sideOutputPorts.length);
+ throw new UnsupportedOperationException(msg);
+ }
+
+ Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder);
+ this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(),
+ coder);
+
+ }
+
+ @SuppressWarnings("unused") // for Kryo
+ private ApexParDoOperator() {
+ this.pipelineOptions = null;
+ this.doFn = null;
+ this.mainOutputTag = null;
+ this.sideOutputTags = null;
+ this.windowingStrategy = null;
+ this.sideInputs = null;
+ this.pushedBack = null;
+ this.sideInputStateInternals = null;
+ }
+
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input =
+ new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
+ @Override
+ public void process(ApexStreamTuple<WindowedValue<InputT>> t) {
+ if (t instanceof ApexStreamTuple.WatermarkTuple) {
+ processWatermark((ApexStreamTuple.WatermarkTuple<?>) t);
+ } else {
+ if (traceTuples) {
+ LOG.debug("\ninput {}\n", t.getValue());
+ }
+ Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(t.getValue());
+ for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+ pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+ pushedBack.get().add(pushedBackValue);
+ }
+ }
+ }
+ };
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 =
+ new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>() {
+ @Override
+ public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t) {
+ if (t instanceof ApexStreamTuple.WatermarkTuple) {
+ // ignore side input watermarks
+ return;
+ }
+
+ int sideInputIndex = 0;
+ if (t instanceof ApexStreamTuple.DataTuple) {
+ sideInputIndex = ((ApexStreamTuple.DataTuple<?>) t).getUnionTag();
+ }
+
+ if (traceTuples) {
+ LOG.debug("\nsideInput {} {}\n", sideInputIndex, t.getValue());
+ }
+
+ PCollectionView<?> sideInput = sideInputs.get(sideInputIndex);
+ sideInputHandler.addSideInputValue(sideInput, t.getValue());
+
+ List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+ for (WindowedValue<InputT> elem : pushedBack.get()) {
+ Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(elem);
+ Iterables.addAll(newPushedBack, justPushedBack);
+ }
+
+ pushedBack.get().clear();
+ pushedBackWatermark.clear();
+ for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+ pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+ pushedBack.get().add(pushedBackValue);
+ }
+
+ // potentially emit watermark
+ processWatermark(ApexStreamTuple.WatermarkTuple.of(currentInputWatermark));
+ }
+ };
+
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>();
+
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 =
+ new DefaultOutputPort<>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 =
+ new DefaultOutputPort<>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 =
+ new DefaultOutputPort<>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 =
+ new DefaultOutputPort<>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 =
+ new DefaultOutputPort<>();
+
+ public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2,
+ sideOutput3, sideOutput4, sideOutput5};
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) {
+ DefaultOutputPort<ApexStreamTuple<?>> sideOutputPort = sideOutputPortMapping.get(tag);
+ if (sideOutputPort != null) {
+ sideOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple));
+ } else {
+ output.emit(ApexStreamTuple.DataTuple.of(tuple));
+ }
+ if (traceTuples) {
+ LOG.debug("\nemitting {}\n", tuple);
+ }
+ }
+
+ private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
+ try {
+ pushbackDoFnRunner.startBundle();
+ Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner
+ .processElementInReadyWindows(elem);
+ pushbackDoFnRunner.finishBundle();
+ return pushedBack;
+ } catch (UserCodeException ue) {
+ if (ue.getCause() instanceof AssertionError) {
+ ApexRunner.assertionError = (AssertionError) ue.getCause();
+ }
+ throw ue;
+ }
+ }
+
+ private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
+ this.currentInputWatermark = mark.getTimestamp();
+
+ if (sideInputs.isEmpty()) {
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", mark);
+ }
+ output.emit(mark);
+ return;
+ }
+
+ long potentialOutputWatermark =
+ Math.min(pushedBackWatermark.get(), currentInputWatermark);
+ if (potentialOutputWatermark > currentOutputWatermark) {
+ currentOutputWatermark = potentialOutputWatermark;
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
+ }
+ output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
+ }
+ }
+
+ @Override
+ public void setup(OperatorContext context) {
+ this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
+ SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
+ if (!sideInputs.isEmpty()) {
+ sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
+ sideInputReader = sideInputHandler;
+ }
+
+ for (int i = 0; i < sideOutputTags.size(); i++) {
+ @SuppressWarnings("unchecked")
+ DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>)
+ sideOutputPorts[i];
+ sideOutputPortMapping.put(sideOutputTags.get(i), port);
+ }
+
+ DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault(
+ pipelineOptions.get(),
+ doFn,
+ sideInputReader,
+ this,
+ mainOutputTag,
+ sideOutputTags,
+ new NoOpStepContext(),
+ new NoOpAggregatorFactory(),
+ windowingStrategy
+ );
+
+ pushbackDoFnRunner =
+ PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+
+ try {
+ doFn.setup();
+ } catch (Exception e) {
+ Throwables.propagateIfPossible(e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public void beginWindow(long windowId) {
+ }
+
+ @Override
+ public void endWindow() {
+ }
+
+ /**
+ * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode.
+ * It is called from {@link org.apache.beam.runners.core.SimpleDoFnRunner}.
+ */
+ public static class NoOpAggregatorFactory implements AggregatorFactory {
+
+ private NoOpAggregatorFactory() {
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+ Class<?> fnClass, ExecutionContext.StepContext step,
+ String name, CombineFn<InputT, AccumT, OutputT> combine) {
+ return new NoOpAggregator<>();
+ }
+
+ private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>,
+ java.io.Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void addValue(InputT value) {
+ }
+
+ @Override
+ public String getName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public CombineFn<InputT, ?, OutputT> getCombineFn() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ };
+ }
+
+ private static class LongMin {
+ long state = Long.MAX_VALUE;
+
+ public void add(long l) {
+ state = Math.min(state, l);
+ }
+
+ public long get() {
+ return state;
+ }
+
+ public void clear() {
+ state = Long.MAX_VALUE;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
new file mode 100644
index 0000000..6fc2f0c
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation.operators;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+
+import java.io.IOException;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.DataTuple;
+import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ValuesSource;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apex input operator that wraps Beam {@link UnboundedSource}.
+ */
+public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
+ extends UnboundedSource.CheckpointMark> implements InputOperator {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ApexReadUnboundedInputOperator.class);
+ private boolean traceTuples = false;
+ private long outputWatermark = 0;
+
+ @Bind(JavaSerializer.class)
+ private final SerializablePipelineOptions pipelineOptions;
+ @Bind(JavaSerializer.class)
+ private final UnboundedSource<OutputT, CheckpointMarkT> source;
+ private final boolean isBoundedSource;
+ private transient UnboundedSource.UnboundedReader<OutputT> reader;
+ private transient boolean available = false;
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output =
+ new DefaultOutputPort<>();
+
+ public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
+ ApexPipelineOptions options) {
+ this.pipelineOptions = new SerializablePipelineOptions(options);
+ this.source = source;
+ this.isBoundedSource = false;
+ }
+
+ public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
+ boolean isBoundedSource, ApexPipelineOptions options) {
+ this.pipelineOptions = new SerializablePipelineOptions(options);
+ this.source = source;
+ this.isBoundedSource = isBoundedSource;
+ }
+
+ @SuppressWarnings("unused") // for Kryo
+ private ApexReadUnboundedInputOperator() {
+ this.pipelineOptions = null; this.source = null; this.isBoundedSource = false;
+ }
+
+ @Override
+ public void beginWindow(long windowId) {
+ if (!available && (isBoundedSource || source instanceof ValuesSource)) {
+ // if it's a Create and the input was consumed, emit final watermark
+ emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ // terminate the stream (allows tests to finish faster)
+ BaseOperator.shutdown();
+ } else {
+ emitWatermarkIfNecessary(reader.getWatermark().getMillis());
+ }
+ }
+
+ private void emitWatermarkIfNecessary(long mark) {
+ if (mark > outputWatermark) {
+ outputWatermark = mark;
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", mark);
+ }
+ output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark));
+ }
+ }
+
+ @Override
+ public void endWindow() {
+ }
+
+ @Override
+ public void setup(OperatorContext context) {
+ this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
+ try {
+ reader = source.createReader(this.pipelineOptions.get(), null);
+ available = reader.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void teardown() {
+ try {
+ if (reader != null) {
+ reader.close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void emitTuples() {
+ try {
+ if (!available) {
+ available = reader.advance();
+ }
+ if (available) {
+ OutputT data = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+ available = reader.advance();
+ if (traceTuples) {
+ LOG.debug("\nemitting '{}' timestamp {}\n", data, timestamp);
+ }
+ output.emit(DataTuple.of(WindowedValue.of(
+ data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
+ }
+ } catch (Exception e) {
+ Throwables.propagateIfPossible(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/package-info.java
new file mode 100644
index 0000000..6bc0194
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translation.operators;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/package-info.java
new file mode 100644
index 0000000..de954c0
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translation;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
new file mode 100644
index 0000000..17a4f81
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CombineFnUtil;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTag.StateBinder;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.joda.time.Instant;
+
+/**
+ * Implementation of {@link StateInternals} that can be serialized and
+ * checkpointed with the operator. Suitable for small states, in the future this
+ * should be based on the incremental state saving components in the Apex
+ * library.
+ */
+@DefaultSerializer(JavaSerializer.class)
+public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
+ private static final long serialVersionUID = 1L;
+ public static <K> ApexStateInternals<K> forKey(K key) {
+ return new ApexStateInternals<>(key);
+ }
+
+ private final K key;
+
+ protected ApexStateInternals(K key) {
+ this.key = key;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ /**
+ * Serializable state for internals (namespace to state tag to coded value).
+ */
+ private final Table<String, String, byte[]> stateTable = HashBasedTable.create();
+
+ @Override
+ public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+ return state(namespace, address, StateContexts.nullContext());
+ }
+
+ @Override
+ public <T extends State> T state(
+ StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) {
+ return address.bind(new ApexStateBinder(key, namespace, address, c));
+ }
+
+ /**
+ * A {@link StateBinder} that returns {@link State} wrappers for serialized state.
+ */
+ private class ApexStateBinder implements StateBinder<K> {
+ private final K key;
+ private final StateNamespace namespace;
+ private final StateContext<?> c;
+
+ private ApexStateBinder(K key, StateNamespace namespace, StateTag<? super K, ?> address,
+ StateContext<?> c) {
+ this.key = key;
+ this.namespace = namespace;
+ this.c = c;
+ }
+
+ @Override
+ public <T> ValueState<T> bindValue(
+ StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ return new ApexValueState<>(namespace, address, coder);
+ }
+
+ @Override
+ public <T> BagState<T> bindBag(
+ final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ return new ApexBagState<>(namespace, address, elemCoder);
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ bindCombiningValue(
+ StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ final CombineFn<InputT, AccumT, OutputT> combineFn) {
+ return new ApexAccumulatorCombiningState<>(
+ namespace,
+ address,
+ accumCoder,
+ key,
+ combineFn.<K>asKeyedFn()
+ );
+ }
+
+ @Override
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+ StateTag<? super K, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
+ return new ApexWatermarkHoldState<>(namespace, address, outputTimeFn);
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ bindKeyedCombiningValue(
+ StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+ return new ApexAccumulatorCombiningState<>(
+ namespace,
+ address,
+ accumCoder,
+ key, combineFn);
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+ bindKeyedCombiningValueWithContext(
+ StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
+ return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
+ }
+ }
+
+ private class AbstractState<T> {
+ protected final StateNamespace namespace;
+ protected final StateTag<?, ? extends State> address;
+ protected final Coder<T> coder;
+
+ private AbstractState(
+ StateNamespace namespace,
+ StateTag<?, ? extends State> address,
+ Coder<T> coder) {
+ this.namespace = namespace;
+ this.address = address;
+ this.coder = coder;
+ }
+
+ protected T readValue() {
+ T value = null;
+ byte[] buf = stateTable.get(namespace.stringKey(), address.getId());
+ if (buf != null) {
+ // TODO: reuse input
+ Input input = new Input(buf);
+ try {
+ return coder.decode(input, Context.OUTER);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return value;
+ }
+
+ public void writeValue(T input) {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ try {
+ coder.encode(input, output, Context.OUTER);
+ stateTable.put(namespace.stringKey(), address.getId(), output.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void clear() {
+ stateTable.remove(namespace.stringKey(), address.getId());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ AbstractState<?> that = (AbstractState<?>) o;
+ return namespace.equals(that.namespace) && address.equals(that.address);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + address.hashCode();
+ return result;
+ }
+ }
+
+ private class ApexValueState<T> extends AbstractState<T> implements ValueState<T> {
+
+ private ApexValueState(
+ StateNamespace namespace,
+ StateTag<?, ValueState<T>> address,
+ Coder<T> coder) {
+ super(namespace, address, coder);
+ }
+
+ @Override
+ public ApexValueState<T> readLater() {
+ return this;
+ }
+
+ @Override
+ public T read() {
+ return readValue();
+ }
+
+ @Override
+ public void write(T input) {
+ writeValue(input);
+ }
+ }
+
+ private final class ApexWatermarkHoldState<W extends BoundedWindow>
+ extends AbstractState<Instant> implements WatermarkHoldState<W> {
+
+ private final OutputTimeFn<? super W> outputTimeFn;
+
+ public ApexWatermarkHoldState(
+ StateNamespace namespace,
+ StateTag<?, WatermarkHoldState<W>> address,
+ OutputTimeFn<? super W> outputTimeFn) {
+ super(namespace, address, InstantCoder.of());
+ this.outputTimeFn = outputTimeFn;
+ }
+
+ @Override
+ public ApexWatermarkHoldState<W> readLater() {
+ return this;
+ }
+
+ @Override
+ public Instant read() {
+ return readValue();
+ }
+
+ @Override
+ public void add(Instant outputTime) {
+ Instant combined = read();
+ combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
+ writeValue(combined);
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ @Override
+ public Boolean read() {
+ return stateTable.get(namespace.stringKey(), address.getId()) == null;
+ }
+ };
+ }
+
+ @Override
+ public OutputTimeFn<? super W> getOutputTimeFn() {
+ return outputTimeFn;
+ }
+
+ }
+
+ private final class ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+ extends AbstractState<AccumT>
+ implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+ private final K key;
+ private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+
+ private ApexAccumulatorCombiningState(StateNamespace namespace,
+ StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> coder,
+ K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+ super(namespace, address, coder);
+ this.key = key;
+ this.combineFn = combineFn;
+ }
+
+ @Override
+ public ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() {
+ return this;
+ }
+
+ @Override
+ public OutputT read() {
+ return combineFn.extractOutput(key, getAccum());
+ }
+
+ @Override
+ public void add(InputT input) {
+ AccumT accum = getAccum();
+ combineFn.addInput(key, accum, input);
+ writeValue(accum);
+ }
+
+ @Override
+ public AccumT getAccum() {
+ AccumT accum = readValue();
+ if (accum == null) {
+ accum = combineFn.createAccumulator(key);
+ }
+ return accum;
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ @Override
+ public Boolean read() {
+ return stateTable.get(namespace.stringKey(), address.getId()) == null;
+ }
+ };
+ }
+
+ @Override
+ public void addAccum(AccumT accum) {
+ accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), accum));
+ writeValue(accum);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+ return combineFn.mergeAccumulators(key, accumulators);
+ }
+
+ }
+
+ private final class ApexBagState<T> extends AbstractState<List<T>> implements BagState<T> {
+ private ApexBagState(
+ StateNamespace namespace,
+ StateTag<?, BagState<T>> address,
+ Coder<T> coder) {
+ super(namespace, address, ListCoder.of(coder));
+ }
+
+ @Override
+ public ApexBagState<T> readLater() {
+ return this;
+ }
+
+ @Override
+ public List<T> read() {
+ List<T> value = super.readValue();
+ if (value == null) {
+ value = new ArrayList<>();
+ }
+ return value;
+ }
+
+ @Override
+ public void add(T input) {
+ List<T> value = read();
+ value.add(input);
+ writeValue(value);
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+
+ @Override
+ public Boolean read() {
+ return stateTable.get(namespace.stringKey(), address.getId()) == null;
+ }
+ };
+ }
+ }
+
+ /**
+ * Factory for {@link ApexStateInternals}.
+ *
+ * @param <K> key type
+ */
+ public static class ApexStateInternalsFactory<K>
+ implements StateInternalsFactory<K>, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public StateInternals<K> stateInternalsForKey(K key) {
+ return ApexStateInternals.forKey(key);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
new file mode 100644
index 0000000..79a4f1b
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.datatorrent.api.Operator;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+
+/**
+ * The common interface for all objects transmitted through streams.
+ *
+ * @param <T> The actual payload type.
+ */
+public interface ApexStreamTuple<T> {
+ /**
+ * Gets the value of the tuple.
+ *
+ * @return tuple
+ */
+ T getValue();
+
+ /**
+ * Data tuple class.
+ *
+ * @param <T> tuple type
+ */
+ class DataTuple<T> implements ApexStreamTuple<T> {
+ private int unionTag;
+ private T value;
+
+ public static <T> DataTuple<T> of(T value) {
+ return new DataTuple<>(value, 0);
+ }
+
+ private DataTuple(T value, int unionTag) {
+ this.value = value;
+ this.unionTag = unionTag;
+ }
+
+ @Override
+ public T getValue() {
+ return value;
+ }
+
+ public void setValue(T value) {
+ this.value = value;
+ }
+
+ public int getUnionTag() {
+ return unionTag;
+ }
+
+ public void setUnionTag(int unionTag) {
+ this.unionTag = unionTag;
+ }
+
+ @Override
+ public String toString() {
+ return value.toString();
+ }
+
+ }
+
+ /**
+ * Tuple that includes a timestamp.
+ *
+ * @param <T> tuple type
+ */
+ class TimestampedTuple<T> extends DataTuple<T> {
+ private long timestamp;
+
+ public TimestampedTuple(long timestamp, T value) {
+ super(value, 0);
+ this.timestamp = timestamp;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(timestamp);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof TimestampedTuple)) {
+ return false;
+ } else {
+ TimestampedTuple<?> other = (TimestampedTuple<?>) obj;
+ return (timestamp == other.timestamp) && Objects.equals(this.getValue(), other.getValue());
+ }
+ }
+
+ }
+
+ /**
+ * Tuple that represents a watermark.
+ *
+ * @param <T> tuple type
+ */
+ class WatermarkTuple<T> extends TimestampedTuple<T> {
+ public static <T> WatermarkTuple<T> of(long timestamp) {
+ return new WatermarkTuple<>(timestamp);
+ }
+
+ protected WatermarkTuple(long timestamp) {
+ super(timestamp, null);
+ }
+
+ @Override
+ public String toString() {
+ return "[Watermark " + getTimestamp() + "]";
+ }
+ }
+
+ /**
+ * Coder for {@link ApexStreamTuple}.
+ */
+ class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> {
+ private static final long serialVersionUID = 1L;
+ final Coder<T> valueCoder;
+
+ public static <T> ApexStreamTupleCoder<T> of(Coder<T> valueCoder) {
+ return new ApexStreamTupleCoder<>(valueCoder);
+ }
+
+ protected ApexStreamTupleCoder(Coder<T> valueCoder) {
+ this.valueCoder = checkNotNull(valueCoder);
+ }
+
+ @Override
+ public void encode(ApexStreamTuple<T> value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ if (value instanceof WatermarkTuple) {
+ outStream.write(1);
+ new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>) value).getTimestamp());
+ } else {
+ outStream.write(0);
+ outStream.write(((DataTuple<?>) value).unionTag);
+ valueCoder.encode(value.getValue(), outStream, context);
+ }
+ }
+
+ @Override
+ public ApexStreamTuple<T> decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
+ int b = inStream.read();
+ if (b == 1) {
+ return new WatermarkTuple<>(new DataInputStream(inStream).readLong());
+ } else {
+ int unionTag = inStream.read();
+ return new DataTuple<>(valueCoder.decode(inStream, context), unionTag);
+ }
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.<Coder<?>>asList(valueCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ verifyDeterministic(
+ this.getClass().getSimpleName() + " requires a deterministic valueCoder",
+ valueCoder);
+ }
+
+ /**
+ * Returns the value coder.
+ */
+ public Coder<T> getValueCoder() {
+ return valueCoder;
+ }
+
+ }
+
+ /**
+ * Central if data tuples received on and emitted from ports should be logged.
+ * Should be called in setup and value cached in operator.
+ */
+ final class Logging {
+ public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator) {
+ return options.isTupleTracingEnabled();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java
new file mode 100644
index 0000000..d08e76f
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+
+/**
+ * The Apex {@link StreamCodec} adapter for using Beam {@link Coder}.
+ */
+public class CoderAdapterStreamCodec implements StreamCodec<Object>, Serializable {
+ private static final long serialVersionUID = 1L;
+ private final Coder<? super Object> coder;
+
+ public CoderAdapterStreamCodec(Coder<? super Object> coder) {
+ this.coder = coder;
+ }
+
+ @Override
+ public Object fromByteArray(Slice fragment) {
+ ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset,
+ fragment.length);
+ try {
+ return coder.decode(bis, Context.OUTER);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Slice toByteArray(Object wv) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ coder.encode(wv, bos, Context.OUTER);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return new Slice(bos.toByteArray());
+ }
+
+ @Override
+ public int getPartition(Object o) {
+ return o.hashCode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
new file mode 100644
index 0000000..078f95f
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Serializable {@link ExecutionContext.StepContext} that does nothing.
+ */
+public class NoOpStepContext implements ExecutionContext.StepContext, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getStepName() {
+ return null;
+ }
+
+ @Override
+ public String getTransformName() {
+ return null;
+ }
+
+ @Override
+ public void noteOutput(WindowedValue<?> output) {
+ }
+
+ @Override
+ public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+ }
+
+ @Override
+ public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag,
+ Iterable<WindowedValue<T>> data,
+ Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws
+ IOException {
+
+ }
+
+ @Override
+ public StateInternals<?> stateInternals() {
+ return null;
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
new file mode 100644
index 0000000..d0dce2b
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * A wrapper to enable serialization of {@link PipelineOptions}.
+ */
+public class SerializablePipelineOptions implements Externalizable {
+
+ private transient ApexPipelineOptions pipelineOptions;
+
+ public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) {
+ this.pipelineOptions = pipelineOptions;
+ }
+
+ public SerializablePipelineOptions() {
+ }
+
+ public ApexPipelineOptions get() {
+ return this.pipelineOptions;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions));
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ String s = in.readUTF();
+ this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class)
+ .as(ApexPipelineOptions.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java
new file mode 100644
index 0000000..395ad1f
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+
+
+/**
+ * A {@link KryoSerializable} holder that uses the specified {@link Coder}.
+ * @param <T> element type
+ */
+public class ValueAndCoderKryoSerializable<T> implements KryoSerializable {
+ private static final JavaSerializer JAVA_SERIALIZER = new JavaSerializer();
+ private T value;
+ private Coder<T> coder;
+
+ public ValueAndCoderKryoSerializable(T value, Coder<T> coder) {
+ this.value = value;
+ this.coder = coder;
+ }
+
+ @SuppressWarnings("unused") // for Kryo
+ private ValueAndCoderKryoSerializable() {
+ }
+
+ public T get() {
+ return value;
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output) {
+ try {
+ kryo.writeClass(output, coder.getClass());
+ kryo.writeObject(output, coder, JAVA_SERIALIZER);
+ coder.encode(value, output, Context.OUTER);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void read(Kryo kryo, Input input) {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<Coder<T>> type = kryo.readClass(input).getType();
+ coder = kryo.readObject(input, type, JAVA_SERIALIZER);
+ value = coder.decode(input, Context.OUTER);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
new file mode 100644
index 0000000..8526618
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+
+/**
+ * Unbounded source that reads from a Java {@link Iterable}.
+ */
+public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
+ private static final long serialVersionUID = 1L;
+
+ private final byte[] codedValues;
+ private final IterableCoder<T> iterableCoder;
+
+ public ValuesSource(Iterable<T> values, Coder<T> coder) {
+ this.iterableCoder = IterableCoder.of(coder);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ iterableCoder.encode(values, bos, Context.OUTER);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ this.codedValues = bos.toByteArray();
+ }
+
+ @Override
+ public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ return Collections.singletonList(this);
+ }
+
+ @Override
+ public UnboundedReader<T> createReader(PipelineOptions options,
+ @Nullable CheckpointMark checkpointMark) {
+ ByteArrayInputStream bis = new ByteArrayInputStream(codedValues);
+ try {
+ Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER);
+ return new ValuesReader<>(values, this);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Nullable
+ @Override
+ public Coder<CheckpointMark> getCheckpointMarkCoder() {
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return iterableCoder.getElemCoder();
+ }
+
+ private static class ValuesReader<T> extends UnboundedReader<T> {
+
+ private final Iterable<T> values;
+ private final UnboundedSource<T, CheckpointMark> source;
+ private transient Iterator<T> iterator;
+ private T current;
+
+ public ValuesReader(Iterable<T> values, UnboundedSource<T, CheckpointMark> source) {
+ this.values = values;
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ if (null == iterator) {
+ iterator = values.iterator();
+ }
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (iterator.hasNext()) {
+ current = iterator.next();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ return current;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return Instant.now();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return null;
+ }
+
+ @Override
+ public UnboundedSource<T, ?> getCurrentSource() {
+ return source;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/package-info.java
new file mode 100644
index 0000000..534b645
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translation.utils;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
deleted file mode 100644
index 539f311..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.apex.translators.io.ValuesSource;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PBegin;
-
-
-/**
- * Wraps elements from Create.Values into an {@link UnboundedSource}.
- * mainly used for testing
- */
-public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
- private static final long serialVersionUID = 1451000241832745629L;
-
- @Override
- public void translate(Create.Values<T> transform, TranslationContext context) {
- try {
- UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(),
- transform.getDefaultOutputCoder((PBegin) context.getInput()));
- ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
- unboundedSource, context.getPipelineOptions());
- context.addOperator(operator, operator.output);
- } catch (CannotProvideCoderException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
deleted file mode 100644
index a39aacb..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import com.google.common.collect.Lists;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.apex.translators.functions.ApexFlattenOperator;
-import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.apex.translators.io.ValuesSource;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-
-/**
- * {@link Flatten.FlattenPCollectionList} translation to Apex operator.
- */
-public class FlattenPCollectionTranslator<T> implements
- TransformTranslator<Flatten.FlattenPCollectionList<T>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
- PCollectionList<T> input = context.getInput();
- List<PCollection<T>> collections = input.getAll();
-
- if (collections.isEmpty()) {
- // create a dummy source that never emits anything
- @SuppressWarnings("unchecked")
- UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(Collections.EMPTY_LIST,
- (Coder<T>) VoidCoder.of());
- ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
- unboundedSource, context.getPipelineOptions());
- context.addOperator(operator, operator.output);
- } else {
- PCollection<T> output = context.getOutput();
- Map<PCollection<?>, Integer> unionTags = Collections.emptyMap();
- flattenCollections(collections, unionTags, output, context);
- }
- }
-
- /**
- * Flatten the given collections into the given result collection. Translates
- * into a cascading merge with 2 input ports per operator. The optional union
- * tags can be used to identify the source in the result stream, used to
- * channel multiple side inputs to a single Apex operator port.
- *
- * @param collections
- * @param unionTags
- * @param finalCollection
- * @param context
- */
- static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>,
- Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) {
- List<PCollection<T>> remainingCollections = Lists.newArrayList();
- PCollection<T> firstCollection = null;
- while (!collections.isEmpty()) {
- for (PCollection<T> collection : collections) {
- if (null == firstCollection) {
- firstCollection = collection;
- } else {
- ApexFlattenOperator<T> operator = new ApexFlattenOperator<>();
- context.addStream(firstCollection, operator.data1);
- Integer unionTag = unionTags.get(firstCollection);
- operator.data1Tag = (unionTag != null) ? unionTag : 0;
- context.addStream(collection, operator.data2);
- unionTag = unionTags.get(collection);
- operator.data2Tag = (unionTag != null) ? unionTag : 0;
-
- if (!collection.getCoder().equals(firstCollection.getCoder())) {
- throw new UnsupportedOperationException("coders don't match");
- }
-
- if (collections.size() > 2) {
- PCollection<T> intermediateCollection = intermediateCollection(collection,
- collection.getCoder());
- context.addOperator(operator, operator.out, intermediateCollection);
- remainingCollections.add(intermediateCollection);
- } else {
- // final stream merge
- context.addOperator(operator, operator.out, finalCollection);
- }
- firstCollection = null;
- }
- }
- if (firstCollection != null) {
- // push to next merge level
- remainingCollections.add(firstCollection);
- firstCollection = null;
- }
- if (remainingCollections.size() > 1) {
- collections = remainingCollections;
- remainingCollections = Lists.newArrayList();
- } else {
- collections = Lists.newArrayList();
- }
- }
- }
-
- static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
- PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(),
- input.getWindowingStrategy(), input.isBounded());
- output.setCoder(outputCoder);
- return output;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
deleted file mode 100644
index cb78579..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import org.apache.beam.runners.apex.translators.functions.ApexGroupByKeyOperator;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * {@link GroupByKey} translation to Apex operator.
- */
-public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void translate(GroupByKey<K, V> transform, TranslationContext context) {
- PCollection<KV<K, V>> input = context.getInput();
- ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(),
- input, context.<K>stateInternalsFactory()
- );
- context.addOperator(group, group.output);
- context.addStream(input, group.input);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
deleted file mode 100644
index 987b729..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.OutputPort;
-import com.google.common.collect.Maps;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link ParDo.BoundMulti} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}.
- */
-public class ParDoBoundMultiTranslator<InputT, OutputT>
- implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> {
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslator.class);
-
- @Override
- public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
- OldDoFn<InputT, OutputT> doFn = transform.getFn();
- PCollectionTuple output = context.getOutput();
- PCollection<InputT> input = context.getInput();
- List<PCollectionView<?>> sideInputs = transform.getSideInputs();
- Coder<InputT> inputCoder = input.getCoder();
- WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
- input.getWindowingStrategy().getWindowFn().windowCoder());
-
- ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
- context.getPipelineOptions(),
- doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
- context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder,
- context.<Void>stateInternalsFactory()
- );
-
- Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
- Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
- for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
- if (outputEntry.getKey() == transform.getMainOutputTag()) {
- ports.put(outputEntry.getValue(), operator.output);
- } else {
- int portIndex = 0;
- for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) {
- if (tag == outputEntry.getKey()) {
- ports.put(outputEntry.getValue(), operator.sideOutputPorts[portIndex]);
- break;
- }
- portIndex++;
- }
- }
- }
- context.addOperator(operator, ports);
- context.addStream(context.getInput(), operator.input);
- if (!sideInputs.isEmpty()) {
- addSideInputs(operator, sideInputs, context);
- }
- }
-
- static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs,
- TranslationContext context) {
- Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
- if (sideInputs.size() > sideInputPorts.length) {
- PCollection<?> unionCollection = unionSideInputs(sideInputs, context);
- context.addStream(unionCollection, sideInputPorts[0]);
- } else {
- // the number of ports for side inputs is fixed and each port can only take one input.
- for (int i = 0; i < sideInputs.size(); i++) {
- context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]);
- }
- }
- }
-
- private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs,
- TranslationContext context) {
- checkArgument(sideInputs.size() > 1, "requires multiple side inputs");
- // flatten and assign union tag
- List<PCollection<Object>> sourceCollections = new ArrayList<>();
- Map<PCollection<?>, Integer> unionTags = new HashMap<>();
- PCollection<Object> firstSideInput = context.getViewInput(sideInputs.get(0));
- for (int i = 0; i < sideInputs.size(); i++) {
- PCollectionView<?> sideInput = sideInputs.get(i);
- PCollection<?> sideInputCollection = context.getViewInput(sideInput);
- if (!sideInputCollection.getWindowingStrategy().equals(
- firstSideInput.getWindowingStrategy())) {
- // TODO: check how to handle this in stream codec
- //String msg = "Multiple side inputs with different window strategies.";
- //throw new UnsupportedOperationException(msg);
- LOG.warn("Side inputs union with different windowing strategies {} {}",
- firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy());
- }
- if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
- String msg = "Multiple side inputs with different coders.";
- throw new UnsupportedOperationException(msg);
- }
- sourceCollections.add(context.<PCollection<Object>>getViewInput(sideInput));
- unionTags.put(sideInputCollection, i);
- }
-
- PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(
- firstSideInput, firstSideInput.getCoder());
- FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection,
- context);
- return resultCollection;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
deleted file mode 100644
index 92567a6..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import java.util.List;
-
-import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-
-/**
- * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}.
- */
-public class ParDoBoundTranslator<InputT, OutputT> implements
- TransformTranslator<ParDo.Bound<InputT, OutputT>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
- OldDoFn<InputT, OutputT> doFn = transform.getFn();
- PCollection<OutputT> output = context.getOutput();
- PCollection<InputT> input = context.getInput();
- List<PCollectionView<?>> sideInputs = transform.getSideInputs();
- Coder<InputT> inputCoder = input.getCoder();
- WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
- input.getWindowingStrategy().getWindowFn().windowCoder());
-
- ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
- context.getPipelineOptions(),
- doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
- output.getWindowingStrategy(), sideInputs, wvInputCoder,
- context.<Void>stateInternalsFactory()
- );
- context.addOperator(operator, operator.output);
- context.addStream(context.getInput(), operator.input);
- if (!sideInputs.isEmpty()) {
- ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
deleted file mode 100644
index 3097276..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import com.datatorrent.api.InputOperator;
-
-import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-
-/**
- * {@link Read.Unbounded} is translated to Apex {@link InputOperator}
- * that wraps {@link UnboundedSource}.
- */
-public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void translate(Read.Unbounded<T> transform, TranslationContext context) {
- UnboundedSource<T, ?> unboundedSource = transform.getSource();
- ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
- unboundedSource, context.getPipelineOptions());
- context.addOperator(operator, operator.output);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
deleted file mode 100644
index dfd2045..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-
-import java.io.Serializable;
-
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * Translates {@link PTransform} to Apex functions.
- */
-public interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable {
- void translate(T transform, TranslationContext context);
-}