You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:50 UTC
[32/39] incubator-beam git commit: BEAM-261 Make translators package
private.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
deleted file mode 100644
index 07c6494..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.api.Operator.OutputPort;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.translators.utils.ApexStateInternals;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translators.utils.CoderAdapterStreamCodec;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
-/**
- * Maintains context data for {@link TransformTranslator}s.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class TranslationContext {
-
- private final ApexPipelineOptions pipelineOptions;
- private AppliedPTransform<?, ?, ?> currentTransform;
- private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>();
- private final Map<String, Operator> operators = new HashMap<>();
- private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>();
-
- public void addView(PCollectionView<?> view) {
- this.viewInputs.put(view, this.getInput());
- }
-
- public <InputT extends PInput> InputT getViewInput(PCollectionView<?> view) {
- PInput input = this.viewInputs.get(view);
- checkArgument(input != null, "unknown view " + view.getName());
- return (InputT) input;
- }
-
- public TranslationContext(ApexPipelineOptions pipelineOptions) {
- this.pipelineOptions = pipelineOptions;
- }
-
- public void setCurrentTransform(TransformTreeNode treeNode) {
- this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
- treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform());
- }
-
- public ApexPipelineOptions getPipelineOptions() {
- return pipelineOptions;
- }
-
- public <InputT extends PInput> InputT getInput() {
- return (InputT) getCurrentTransform().getInput();
- }
-
- public <OutputT extends POutput> OutputT getOutput() {
- return (OutputT) getCurrentTransform().getOutput();
- }
-
- private AppliedPTransform<?, ?, ?> getCurrentTransform() {
- checkArgument(currentTransform != null, "current transform not set");
- return currentTransform;
- }
-
- public void addOperator(Operator operator, OutputPort port) {
- addOperator(operator, port, this.<PCollection<?>>getOutput());
- }
-
- /**
- * Register operator and output ports for the given collections.
- * @param operator
- * @param ports
- */
- public void addOperator(Operator operator, Map<PCollection<?>, OutputPort<?>> ports) {
- boolean first = true;
- for (Map.Entry<PCollection<?>, OutputPort<?>> portEntry : ports.entrySet()) {
- if (first) {
- addOperator(operator, portEntry.getValue(), portEntry.getKey());
- first = false;
- } else {
- this.streams.put(portEntry.getKey(), (Pair) new ImmutablePair<>(portEntry.getValue(),
- new ArrayList<>()));
- }
- }
- }
-
- /**
- * Add the operator with its output port for the given result {link PCollection}.
- * @param operator
- * @param port
- * @param output
- */
- public void addOperator(Operator operator, OutputPort port, PCollection output) {
- // Apex DAG requires a unique operator name
- // use the transform's name and make it unique
- String name = getCurrentTransform().getFullName();
- for (int i = 1; this.operators.containsKey(name); i++) {
- name = getCurrentTransform().getFullName() + i;
- }
- this.operators.put(name, operator);
- this.streams.put(output, (Pair) new ImmutablePair<>(port, new ArrayList<>()));
- }
-
- public void addStream(PInput input, InputPort inputPort) {
- Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input);
- checkArgument(stream != null, "no upstream operator defined for %s", input);
- stream.getRight().add(inputPort);
- }
-
- public void populateDAG(DAG dag) {
- for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) {
- dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue());
- }
- int streamIndex = 0;
- for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this.
- streams.entrySet()) {
- List<InputPort<?>> sinksList = streamEntry.getValue().getRight();
- InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]);
- if (sinks.length > 0) {
- dag.addStream("stream" + streamIndex++, streamEntry.getValue().getLeft(), sinks);
- for (InputPort port : sinks) {
- PCollection pc = streamEntry.getKey();
- Coder coder = pc.getCoder();
- if (pc.getWindowingStrategy() != null) {
- coder = FullWindowedValueCoder.of(pc.getCoder(),
- pc.getWindowingStrategy().getWindowFn().windowCoder()
- );
- }
- Coder<Object> wrapperCoder = ApexStreamTuple.ApexStreamTupleCoder.of(coder);
- CoderAdapterStreamCodec streamCodec = new CoderAdapterStreamCodec(wrapperCoder);
- dag.setInputPortAttribute(port, PortContext.STREAM_CODEC, streamCodec);
- }
- }
- }
- }
-
- /**
- * Return the {@link StateInternalsFactory} for the pipeline translation.
- * @return
- */
- public <K> StateInternalsFactory<K> stateInternalsFactory() {
- return new ApexStateInternals.ApexStateInternalsFactory();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
deleted file mode 100644
index 703b1f4..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.functions;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.WatermarkTuple;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Apex operator for Beam {@link Flatten.FlattenPCollectionList}.
- */
-public class ApexFlattenOperator<InputT> extends BaseOperator {
-
- private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class);
- private boolean traceTuples = false;
-
- private long inputWM1;
- private long inputWM2;
- private long outputWM;
-
- public int data1Tag;
- public int data2Tag;
-
- /**
- * Data input port 1.
- */
- public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 =
- new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
- /**
- * Emits to port "out"
- */
- @Override
- public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
- if (tuple instanceof WatermarkTuple) {
- WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple;
- if (wmTuple.getTimestamp() > inputWM1) {
- inputWM1 = wmTuple.getTimestamp();
- if (inputWM1 <= inputWM2) {
- // move output watermark and emit it
- outputWM = inputWM1;
- if (traceTuples) {
- LOG.debug("\nemitting watermark {}\n", outputWM);
- }
- out.emit(tuple);
- }
- }
- return;
- }
- if (traceTuples) {
- LOG.debug("\nemitting {}\n", tuple);
- }
-
- if (data1Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
- ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data1Tag);
- }
- out.emit(tuple);
- }
- };
-
- /**
- * Data input port 2.
- */
- public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 =
- new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
- /**
- * Emits to port "out"
- */
- @Override
- public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
- if (tuple instanceof WatermarkTuple) {
- WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple;
- if (wmTuple.getTimestamp() > inputWM2) {
- inputWM2 = wmTuple.getTimestamp();
- if (inputWM2 <= inputWM1) {
- // move output watermark and emit it
- outputWM = inputWM2;
- if (traceTuples) {
- LOG.debug("\nemitting watermark {}\n", outputWM);
- }
- out.emit(tuple);
- }
- }
- return;
- }
- if (traceTuples) {
- LOG.debug("\nemitting {}\n", tuple);
- }
-
- if (data2Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
- ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data2Tag);
- }
- out.emit(tuple);
- }
- };
-
- /**
- * Output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out =
- new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
deleted file mode 100644
index 4c28c85..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.functions;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Throwables;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Apex operator for Beam {@link GroupByKey}.
- * This operator expects the input stream already partitioned by K,
- * which is determined by the {@link StreamCodec} on the input port.
- *
- * @param <K>
- * @param <V>
- */
-public class ApexGroupByKeyOperator<K, V> implements Operator {
- private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class);
- private boolean traceTuples = true;
-
- @Bind(JavaSerializer.class)
- private WindowingStrategy<V, BoundedWindow> windowingStrategy;
- @Bind(JavaSerializer.class)
- private Coder<K> keyCoder;
- @Bind(JavaSerializer.class)
- private Coder<V> valueCoder;
-
- @Bind(JavaSerializer.class)
- private final SerializablePipelineOptions serializedOptions;
- @Bind(JavaSerializer.class)
- private final StateInternalsFactory<K> stateInternalsFactory;
- private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>();
- private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
-
- private transient ProcessContext context;
- private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
- private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
- private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
- public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input =
- new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() {
- @Override
- public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t) {
- try {
- if (t instanceof ApexStreamTuple.WatermarkTuple) {
- ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>) t;
- processWatermark(mark);
- if (traceTuples) {
- LOG.debug("\nemitting watermark {}\n", mark.getTimestamp());
- }
- output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(
- mark.getTimestamp()));
- return;
- }
- if (traceTuples) {
- LOG.debug("\ninput {}\n", t.getValue());
- }
- processElement(t.getValue());
- } catch (Exception e) {
- Throwables.propagateIfPossible(e);
- throw new RuntimeException(e);
- }
- }
- };
-
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>>
- output = new DefaultOutputPort<>();
-
- @SuppressWarnings("unchecked")
- public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input,
- StateInternalsFactory<K> stateInternalsFactory) {
- checkNotNull(pipelineOptions);
- this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
- this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>) input.getWindowingStrategy();
- this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
- this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
- this.stateInternalsFactory = stateInternalsFactory;
- }
-
- @SuppressWarnings("unused") // for Kryo
- private ApexGroupByKeyOperator() {
- this.serializedOptions = null;
- this.stateInternalsFactory = null;
- }
-
- @Override
- public void beginWindow(long l) {
- }
-
- @Override
- public void endWindow() {
- }
-
- @Override
- public void setup(OperatorContext context) {
- this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
- StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory();
- this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy,
- stateInternalsFactory, SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
- this.context = new ProcessContext(fn, this.timerInternals);
- }
-
- @Override
- public void teardown() {
- }
-
- /**
- * Returns the list of timers that are ready to fire. These are the timers
- * that are registered to be triggered at a time before the current watermark.
- * We keep these timers in a Set, so that they are deduplicated, as the same
- * timer can be registered multiple times.
- */
- private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(
- long currentWatermark) {
-
- // we keep the timers to return in a different list and launch them later
- // because we cannot prevent a trigger from registering another trigger,
- // which would lead to concurrent modification exception.
- Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create();
-
- Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it =
- activeTimers.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
-
- Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
- while (timerIt.hasNext()) {
- TimerInternals.TimerData timerData = timerIt.next();
- if (timerData.getTimestamp().isBefore(currentWatermark)) {
- toFire.put(keyWithTimers.getKey(), timerData);
- timerIt.remove();
- }
- }
-
- if (keyWithTimers.getValue().isEmpty()) {
- it.remove();
- }
- }
- return toFire;
- }
-
- private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception {
- final KV<K, V> kv = windowedValue.getValue();
- final WindowedValue<V> updatedWindowedValue = WindowedValue.of(kv.getValue(),
- windowedValue.getTimestamp(),
- windowedValue.getWindows(),
- windowedValue.getPane());
-
- KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
- kv.getKey(),
- Collections.singletonList(updatedWindowedValue));
-
- context.setElement(kwi, getStateInternalsForKey(kwi.key()));
- fn.processElement(context);
- }
-
- private StateInternals<K> getStateInternalsForKey(K key) {
- final ByteBuffer keyBytes;
- try {
- keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
- } catch (CoderException e) {
- throw new RuntimeException(e);
- }
- StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes);
- if (stateInternals == null) {
- stateInternals = stateInternalsFactory.stateInternalsForKey(key);
- perKeyStateInternals.put(keyBytes, stateInternals);
- }
- return stateInternals;
- }
-
- private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
- final ByteBuffer keyBytes;
- try {
- keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
- } catch (CoderException e) {
- throw new RuntimeException(e);
- }
- Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
- if (timersForKey == null) {
- timersForKey = new HashSet<>();
- }
- timersForKey.add(timer);
- activeTimers.put(keyBytes, timersForKey);
- }
-
- private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
- final ByteBuffer keyBytes;
- try {
- keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
- } catch (CoderException e) {
- throw new RuntimeException(e);
- }
- Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
- if (timersForKey != null) {
- timersForKey.remove(timer);
- if (timersForKey.isEmpty()) {
- activeTimers.remove(keyBytes);
- } else {
- activeTimers.put(keyBytes, timersForKey);
- }
- }
- }
-
- private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
- this.inputWatermark = new Instant(mark.getTimestamp());
- Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(
- mark.getTimestamp());
- if (!timers.isEmpty()) {
- for (ByteBuffer keyBytes : timers.keySet()) {
- K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
- KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes));
- context.setElement(kwi, getStateInternalsForKey(kwi.key()));
- fn.processElement(context);
- }
- }
- }
-
- private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?,
- KeyedWorkItem<K, V>>.ProcessContext {
-
- private final ApexTimerInternals timerInternals;
- private StateInternals<K> stateInternals;
- private KeyedWorkItem<K, V> element;
-
- public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function,
- ApexTimerInternals timerInternals) {
- function.super();
- this.timerInternals = checkNotNull(timerInternals);
- }
-
- public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) {
- this.element = element;
- this.stateInternals = stateForKey;
- }
-
- @Override
- public KeyedWorkItem<K, V> element() {
- return this.element;
- }
-
- @Override
- public Instant timestamp() {
- throw new UnsupportedOperationException(
- "timestamp() is not available when processing KeyedWorkItems.");
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return serializedOptions.get();
- }
-
- @Override
- public void output(KV<K, Iterable<V>> output) {
- throw new UnsupportedOperationException(
- "output() is not available when processing KeyedWorkItems.");
- }
-
- @Override
- public void outputWithTimestamp(KV<K, Iterable<V>> output, Instant timestamp) {
- throw new UnsupportedOperationException(
- "outputWithTimestamp() is not available when processing KeyedWorkItems.");
- }
-
- @Override
- public PaneInfo pane() {
- throw new UnsupportedOperationException(
- "pane() is not available when processing KeyedWorkItems.");
- }
-
- @Override
- public BoundedWindow window() {
- throw new UnsupportedOperationException(
- "window() is not available when processing KeyedWorkItems.");
- }
-
- @Override
- public WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> windowingInternals() {
- return new WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>() {
-
- @Override
- public StateInternals<K> stateInternals() {
- return stateInternals;
- }
-
- @Override
- public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo pane) {
- if (traceTuples) {
- LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
- }
- ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(
- WindowedValue.of(output, timestamp, windows, pane)));
- }
-
- @Override
- public TimerInternals timerInternals() {
- return timerInternals;
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
- }
-
- @Override
- public PaneInfo pane() {
- throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
- }
-
- @Override
- public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
- Coder<T> elemCoder) throws IOException {
- throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
- throw new RuntimeException("sideInput() is not available in Streaming mode.");
- }
- };
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- throw new RuntimeException("sideInput() is not supported in Streaming mode.");
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- // ignore the side output, this can happen when a user does not register
- // side outputs but then outputs using a freshly created TupleTag.
- throw new RuntimeException("sideOutput() is not available when grouping by window.");
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- sideOutput(tag, output);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
- throw new UnsupportedOperationException();
- }
- }
-
- /**
- * An implementation of Beam's {@link TimerInternals}.
- *
- */
- public class ApexTimerInternals implements TimerInternals {
-
- @Override
- public void setTimer(TimerData timerKey) {
- registerActiveTimer(context.element().key(), timerKey);
- }
-
- @Override
- public void deleteTimer(TimerData timerKey) {
- unregisterActiveTimer(context.element().key(), timerKey);
- }
-
- @Override
- public Instant currentProcessingTime() {
- return Instant.now();
- }
-
- @Override
- public Instant currentSynchronizedProcessingTime() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return inputWatermark;
- }
-
- @Override
- public Instant currentOutputWatermarkTime() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void setTimer(StateNamespace namespace, String timerId, Instant target,
- TimeDomain timeDomain) {
- throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
- }
-
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId) {
- throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
- }
-
- }
-
- private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public StateInternals<K> stateInternalsForKey(K key) {
- return getStateInternalsForKey(key);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
deleted file mode 100644
index 43384d6..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.functions;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translators.utils.NoOpStepContext;
-import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
-import org.apache.beam.runners.apex.translators.utils.ValueAndCoderKryoSerializable;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.SideInputHandler;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.NullSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Apex operator for Beam {@link DoFn}.
- */
-public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager {
- private static final Logger LOG = LoggerFactory.getLogger(ApexParDoOperator.class);
- private boolean traceTuples = true;
-
- @Bind(JavaSerializer.class)
- private final SerializablePipelineOptions pipelineOptions;
- @Bind(JavaSerializer.class)
- private final OldDoFn<InputT, OutputT> doFn;
- @Bind(JavaSerializer.class)
- private final TupleTag<OutputT> mainOutputTag;
- @Bind(JavaSerializer.class)
- private final List<TupleTag<?>> sideOutputTags;
- @Bind(JavaSerializer.class)
- private final WindowingStrategy<?, ?> windowingStrategy;
- @Bind(JavaSerializer.class)
- private final List<PCollectionView<?>> sideInputs;
-
- private final StateInternals<Void> sideInputStateInternals;
- private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
- private LongMin pushedBackWatermark = new LongMin();
- private long currentInputWatermark = Long.MIN_VALUE;
- private long currentOutputWatermark = currentInputWatermark;
-
- private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
- private transient SideInputHandler sideInputHandler;
- private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping =
- Maps.newHashMapWithExpectedSize(5);
-
- public ApexParDoOperator(
- ApexPipelineOptions pipelineOptions,
- OldDoFn<InputT, OutputT> doFn,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- WindowingStrategy<?, ?> windowingStrategy,
- List<PCollectionView<?>> sideInputs,
- Coder<WindowedValue<InputT>> inputCoder,
- StateInternalsFactory<Void> stateInternalsFactory
- ) {
- this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
- this.doFn = doFn;
- this.mainOutputTag = mainOutputTag;
- this.sideOutputTags = sideOutputTags;
- this.windowingStrategy = windowingStrategy;
- this.sideInputs = sideInputs;
- this.sideInputStateInternals = stateInternalsFactory.stateInternalsForKey(null);
-
- if (sideOutputTags.size() > sideOutputPorts.length) {
- String msg = String.format("Too many side outputs (currently only supporting %s).",
- sideOutputPorts.length);
- throw new UnsupportedOperationException(msg);
- }
-
- Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder);
- this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(),
- coder);
-
- }
-
- @SuppressWarnings("unused") // for Kryo
- private ApexParDoOperator() {
- this.pipelineOptions = null;
- this.doFn = null;
- this.mainOutputTag = null;
- this.sideOutputTags = null;
- this.windowingStrategy = null;
- this.sideInputs = null;
- this.pushedBack = null;
- this.sideInputStateInternals = null;
- }
-
- public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input =
- new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
- @Override
- public void process(ApexStreamTuple<WindowedValue<InputT>> t) {
- if (t instanceof ApexStreamTuple.WatermarkTuple) {
- processWatermark((ApexStreamTuple.WatermarkTuple<?>) t);
- } else {
- if (traceTuples) {
- LOG.debug("\ninput {}\n", t.getValue());
- }
- Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(t.getValue());
- for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
- pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
- pushedBack.get().add(pushedBackValue);
- }
- }
- }
- };
-
- @InputPortFieldAnnotation(optional = true)
- public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 =
- new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>() {
- @Override
- public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t) {
- if (t instanceof ApexStreamTuple.WatermarkTuple) {
- // ignore side input watermarks
- return;
- }
-
- int sideInputIndex = 0;
- if (t instanceof ApexStreamTuple.DataTuple) {
- sideInputIndex = ((ApexStreamTuple.DataTuple<?>) t).getUnionTag();
- }
-
- if (traceTuples) {
- LOG.debug("\nsideInput {} {}\n", sideInputIndex, t.getValue());
- }
-
- PCollectionView<?> sideInput = sideInputs.get(sideInputIndex);
- sideInputHandler.addSideInputValue(sideInput, t.getValue());
-
- List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
- for (WindowedValue<InputT> elem : pushedBack.get()) {
- Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(elem);
- Iterables.addAll(newPushedBack, justPushedBack);
- }
-
- pushedBack.get().clear();
- pushedBackWatermark.clear();
- for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
- pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
- pushedBack.get().add(pushedBackValue);
- }
-
- // potentially emit watermark
- processWatermark(ApexStreamTuple.WatermarkTuple.of(currentInputWatermark));
- }
- };
-
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>();
-
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 =
- new DefaultOutputPort<>();
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 =
- new DefaultOutputPort<>();
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 =
- new DefaultOutputPort<>();
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 =
- new DefaultOutputPort<>();
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 =
- new DefaultOutputPort<>();
-
- public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2,
- sideOutput3, sideOutput4, sideOutput5};
-
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) {
- DefaultOutputPort<ApexStreamTuple<?>> sideOutputPort = sideOutputPortMapping.get(tag);
- if (sideOutputPort != null) {
- sideOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple));
- } else {
- output.emit(ApexStreamTuple.DataTuple.of(tuple));
- }
- if (traceTuples) {
- LOG.debug("\nemitting {}\n", tuple);
- }
- }
-
- private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
- try {
- pushbackDoFnRunner.startBundle();
- Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner
- .processElementInReadyWindows(elem);
- pushbackDoFnRunner.finishBundle();
- return pushedBack;
- } catch (UserCodeException ue) {
- if (ue.getCause() instanceof AssertionError) {
- ApexRunner.assertionError = (AssertionError) ue.getCause();
- }
- throw ue;
- }
- }
-
- private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
- this.currentInputWatermark = mark.getTimestamp();
-
- if (sideInputs.isEmpty()) {
- if (traceTuples) {
- LOG.debug("\nemitting watermark {}\n", mark);
- }
- output.emit(mark);
- return;
- }
-
- long potentialOutputWatermark =
- Math.min(pushedBackWatermark.get(), currentInputWatermark);
- if (potentialOutputWatermark > currentOutputWatermark) {
- currentOutputWatermark = potentialOutputWatermark;
- if (traceTuples) {
- LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
- }
- output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
- }
- }
-
- @Override
- public void setup(OperatorContext context) {
- this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
- SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
- if (!sideInputs.isEmpty()) {
- sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
- sideInputReader = sideInputHandler;
- }
-
- for (int i = 0; i < sideOutputTags.size(); i++) {
- @SuppressWarnings("unchecked")
- DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>)
- sideOutputPorts[i];
- sideOutputPortMapping.put(sideOutputTags.get(i), port);
- }
-
- DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault(
- pipelineOptions.get(),
- doFn,
- sideInputReader,
- this,
- mainOutputTag,
- sideOutputTags,
- new NoOpStepContext(),
- new NoOpAggregatorFactory(),
- windowingStrategy
- );
-
- pushbackDoFnRunner =
- PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
-
- try {
- doFn.setup();
- } catch (Exception e) {
- Throwables.propagateIfPossible(e);
- throw new RuntimeException(e);
- }
-
- }
-
- @Override
- public void beginWindow(long windowId) {
- }
-
- @Override
- public void endWindow() {
- }
-
- /**
- * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode.
- * It is called from {@link org.apache.beam.sdk.util.SimpleDoFnRunner}.
- */
- public static class NoOpAggregatorFactory implements AggregatorFactory {
-
- private NoOpAggregatorFactory() {
- }
-
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass, ExecutionContext.StepContext step,
- String name, CombineFn<InputT, AccumT, OutputT> combine) {
- return new NoOpAggregator<InputT, OutputT>();
- }
-
- private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>,
- java.io.Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void addValue(InputT value) {
- }
-
- @Override
- public String getName() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public CombineFn<InputT, ?, OutputT> getCombineFn() {
- // TODO Auto-generated method stub
- return null;
- }
-
- };
- }
-
- private static class LongMin {
- long state = Long.MAX_VALUE;
-
- public void add(long l) {
- state = Math.min(state, l);
- }
-
- public long get() {
- return state;
- }
-
- public void clear() {
- state = Long.MAX_VALUE;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
deleted file mode 100644
index ecb0adb..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation of the Beam runner for Apache Apex.
- */
-package org.apache.beam.runners.apex.translators.functions;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
deleted file mode 100644
index 61236ca..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators.io;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Throwables;
-
-import java.io.IOException;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.DataTuple;
-import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Apex input operator that wraps Beam {@link UnboundedSource}.
- */
-public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
- extends UnboundedSource.CheckpointMark> implements InputOperator {
- private static final Logger LOG = LoggerFactory.getLogger(
- ApexReadUnboundedInputOperator.class);
- private boolean traceTuples = false;
- private long outputWatermark = 0;
-
- @Bind(JavaSerializer.class)
- private final SerializablePipelineOptions pipelineOptions;
- @Bind(JavaSerializer.class)
- private final UnboundedSource<OutputT, CheckpointMarkT> source;
- private final boolean isBoundedSource;
- private transient UnboundedSource.UnboundedReader<OutputT> reader;
- private transient boolean available = false;
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output =
- new DefaultOutputPort<>();
-
- public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
- ApexPipelineOptions options) {
- this.pipelineOptions = new SerializablePipelineOptions(options);
- this.source = source;
- this.isBoundedSource = false;
- }
-
- public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
- boolean isBoundedSource, ApexPipelineOptions options) {
- this.pipelineOptions = new SerializablePipelineOptions(options);
- this.source = source;
- this.isBoundedSource = isBoundedSource;
- }
-
- @SuppressWarnings("unused") // for Kryo
- private ApexReadUnboundedInputOperator() {
- this.pipelineOptions = null; this.source = null; this.isBoundedSource = false;
- }
-
- @Override
- public void beginWindow(long windowId) {
- if (!available && (isBoundedSource || source instanceof ValuesSource)) {
- // if it's a Create and the input was consumed, emit final watermark
- emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
- // terminate the stream (allows tests to finish faster)
- BaseOperator.shutdown();
- } else {
- emitWatermarkIfNecessary(reader.getWatermark().getMillis());
- }
- }
-
- private void emitWatermarkIfNecessary(long mark) {
- if (mark > outputWatermark) {
- outputWatermark = mark;
- if (traceTuples) {
- LOG.debug("\nemitting watermark {}\n", mark);
- }
- output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark));
- }
- }
-
- @Override
- public void endWindow() {
- }
-
- @Override
- public void setup(OperatorContext context) {
- this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
- try {
- reader = source.createReader(this.pipelineOptions.get(), null);
- available = reader.start();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void teardown() {
- try {
- if (reader != null) {
- reader.close();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void emitTuples() {
- try {
- if (!available) {
- available = reader.advance();
- }
- if (available) {
- OutputT data = reader.getCurrent();
- Instant timestamp = reader.getCurrentTimestamp();
- available = reader.advance();
- if (traceTuples) {
- LOG.debug("\nemitting '{}' timestamp {}\n", data, timestamp);
- }
- output.emit(DataTuple.of(WindowedValue.of(
- data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
- }
- } catch (Exception e) {
- Throwables.propagateIfPossible(e);
- throw new RuntimeException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
deleted file mode 100644
index fadf8ec4..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators.io;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.joda.time.Instant;
-
-/**
- * Unbounded source that reads from a Java {@link Iterable}.
- */
-public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
- private static final long serialVersionUID = 1L;
-
- private final byte[] codedValues;
- private final IterableCoder<T> iterableCoder;
-
- public ValuesSource(Iterable<T> values, Coder<T> coder) {
- this.iterableCoder = IterableCoder.of(coder);
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try {
- iterableCoder.encode(values, bos, Context.OUTER);
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- this.codedValues = bos.toByteArray();
- }
-
- @Override
- public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
- int desiredNumSplits, PipelineOptions options) throws Exception {
- return Collections.singletonList(this);
- }
-
- @Override
- public UnboundedReader<T> createReader(PipelineOptions options,
- @Nullable CheckpointMark checkpointMark) {
- ByteArrayInputStream bis = new ByteArrayInputStream(codedValues);
- try {
- Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER);
- return new ValuesReader<>(values, this);
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- @Nullable
- @Override
- public Coder<CheckpointMark> getCheckpointMarkCoder() {
- return null;
- }
-
- @Override
- public void validate() {
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder() {
- return iterableCoder.getElemCoder();
- }
-
- private static class ValuesReader<T> extends UnboundedReader<T> {
-
- private final Iterable<T> values;
- private final UnboundedSource<T, CheckpointMark> source;
- private transient Iterator<T> iterator;
- private T current;
-
- public ValuesReader(Iterable<T> values, UnboundedSource<T, CheckpointMark> source) {
- this.values = values;
- this.source = source;
- }
-
- @Override
- public boolean start() throws IOException {
- if (null == iterator) {
- iterator = values.iterator();
- }
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- if (iterator.hasNext()) {
- current = iterator.next();
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- return current;
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- return Instant.now();
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public Instant getWatermark() {
- return Instant.now();
- }
-
- @Override
- public CheckpointMark getCheckpointMark() {
- return null;
- }
-
- @Override
- public UnboundedSource<T, ?> getCurrentSource() {
- return source;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java
deleted file mode 100644
index 0d17f19..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation of the Beam runner for Apache Apex.
- */
-package org.apache.beam.runners.apex.translators.io;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java
deleted file mode 100644
index 7d7c6cc..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation of the Beam runner for Apache Apex.
- */
-package org.apache.beam.runners.apex.translators;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternals.java
deleted file mode 100644
index edc1220..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternals.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.Table;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTag.StateBinder;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.joda.time.Instant;
-
-/**
- * Implementation of {@link StateInternals} that can be serialized and
- * checkpointed with the operator. Suitable for small states, in the future this
- * should be based on the incremental state saving components in the Apex
- * library.
- */
-@DefaultSerializer(JavaSerializer.class)
-public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
- private static final long serialVersionUID = 1L;
- public static <K> ApexStateInternals<K> forKey(K key) {
- return new ApexStateInternals<>(key);
- }
-
- private final K key;
-
- protected ApexStateInternals(K key) {
- this.key = key;
- }
-
- @Override
- public K getKey() {
- return key;
- }
-
- /**
- * Serializable state for internals (namespace -> state tag -> coded value).
- */
- private final Table<String, String, byte[]> stateTable = HashBasedTable.create();
-
- @Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
- return state(namespace, address, StateContexts.nullContext());
- }
-
- @Override
- public <T extends State> T state(
- StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) {
- return address.bind(new ApexStateBinder(key, namespace, address, c));
- }
-
- /**
- * A {@link StateBinder} that returns {@link State} wrappers for serialized state.
- */
- private class ApexStateBinder implements StateBinder<K> {
- private final K key;
- private final StateNamespace namespace;
- private final StateContext<?> c;
-
- private ApexStateBinder(K key, StateNamespace namespace, StateTag<? super K, ?> address,
- StateContext<?> c) {
- this.key = key;
- this.namespace = namespace;
- this.c = c;
- }
-
- @Override
- public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
- return new ApexValueState<T>(namespace, address, coder);
- }
-
- @Override
- public <T> BagState<T> bindBag(
- final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
- return new ApexBagState<T>(namespace, address, elemCoder);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- final CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>(
- namespace,
- address,
- accumCoder,
- key,
- combineFn.<K>asKeyedFn()
- );
- }
-
- @Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- return new ApexWatermarkHoldState<W>(namespace, address, outputTimeFn);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>(
- namespace,
- address,
- accumCoder,
- key, combineFn);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
- bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
- }
- }
-
- private class AbstractState<T> {
- protected final StateNamespace namespace;
- protected final StateTag<?, ? extends State> address;
- protected final Coder<T> coder;
-
- private AbstractState(
- StateNamespace namespace,
- StateTag<?, ? extends State> address,
- Coder<T> coder) {
- this.namespace = namespace;
- this.address = address;
- this.coder = coder;
- }
-
- protected T readValue() {
- T value = null;
- byte[] buf = stateTable.get(namespace.stringKey(), address.getId());
- if (buf != null) {
- // TODO: reuse input
- Input input = new Input(buf);
- try {
- return coder.decode(input, Context.OUTER);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return value;
- }
-
- public void writeValue(T input) {
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- try {
- coder.encode(input, output, Context.OUTER);
- stateTable.put(namespace.stringKey(), address.getId(), output.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void clear() {
- stateTable.remove(namespace.stringKey(), address.getId());
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- @SuppressWarnings("unchecked")
- AbstractState<?> that = (AbstractState<?>) o;
- return namespace.equals(that.namespace) && address.equals(that.address);
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
- private class ApexValueState<T> extends AbstractState<T> implements ValueState<T> {
-
- private ApexValueState(
- StateNamespace namespace,
- StateTag<?, ValueState<T>> address,
- Coder<T> coder) {
- super(namespace, address, coder);
- }
-
- @Override
- public ApexValueState<T> readLater() {
- return this;
- }
-
- @Override
- public T read() {
- return readValue();
- }
-
- @Override
- public void write(T input) {
- writeValue(input);
- }
- }
-
- private final class ApexWatermarkHoldState<W extends BoundedWindow>
- extends AbstractState<Instant> implements WatermarkHoldState<W> {
-
- private final OutputTimeFn<? super W> outputTimeFn;
-
- public ApexWatermarkHoldState(
- StateNamespace namespace,
- StateTag<?, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- super(namespace, address, InstantCoder.of());
- this.outputTimeFn = outputTimeFn;
- }
-
- @Override
- public ApexWatermarkHoldState<W> readLater() {
- return this;
- }
-
- @Override
- public Instant read() {
- return readValue();
- }
-
- @Override
- public void add(Instant outputTime) {
- Instant combined = read();
- combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
- writeValue(combined);
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- @Override
- public Boolean read() {
- return stateTable.get(namespace.stringKey(), address.getId()) == null;
- }
- };
- }
-
- @Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
- }
-
- }
-
- private final class ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>
- extends AbstractState<AccumT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
- private final K key;
- private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
-
- private ApexAccumulatorCombiningState(StateNamespace namespace,
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> coder,
- K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- super(namespace, address, coder);
- this.key = key;
- this.combineFn = combineFn;
- }
-
- @Override
- public ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() {
- return this;
- }
-
- @Override
- public OutputT read() {
- return combineFn.extractOutput(key, getAccum());
- }
-
- @Override
- public void add(InputT input) {
- AccumT accum = getAccum();
- combineFn.addInput(key, accum, input);
- writeValue(accum);
- }
-
- @Override
- public AccumT getAccum() {
- AccumT accum = readValue();
- if (accum == null) {
- accum = combineFn.createAccumulator(key);
- }
- return accum;
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- @Override
- public Boolean read() {
- return stateTable.get(namespace.stringKey(), address.getId()) == null;
- }
- };
- }
-
- @Override
- public void addAccum(AccumT accum) {
- accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), accum));
- writeValue(accum);
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(key, accumulators);
- }
-
- }
-
- private final class ApexBagState<T> extends AbstractState<List<T>> implements BagState<T> {
- private ApexBagState(
- StateNamespace namespace,
- StateTag<?, BagState<T>> address,
- Coder<T> coder) {
- super(namespace, address, ListCoder.of(coder));
- }
-
- @Override
- public ApexBagState<T> readLater() {
- return this;
- }
-
- @Override
- public List<T> read() {
- List<T> value = super.readValue();
- if (value == null) {
- value = new ArrayList<T>();
- }
- return value;
- }
-
- @Override
- public void add(T input) {
- List<T> value = read();
- value.add(input);
- writeValue(value);
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
-
- @Override
- public Boolean read() {
- return stateTable.get(namespace.stringKey(), address.getId()) == null;
- }
- };
- }
- }
-
- /**
- * Factory for {@link ApexStateInternals}.
- *
- * @param <K>
- */
- public static class ApexStateInternalsFactory<K>
- implements StateInternalsFactory<K>, Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public StateInternals<K> stateInternalsForKey(K key) {
- return ApexStateInternals.forKey(key);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
deleted file mode 100644
index 25518dc..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.datatorrent.api.Operator;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-
-/**
- * The common interface for all objects transmitted through streams.
- *
- * @param <T> The actual payload type.
- */
-public interface ApexStreamTuple<T> {
- /**
- * Gets the value of the tuple.
- *
- * @return
- */
- T getValue();
-
- /**
- * Data tuple class.
- *
- * @param <T>
- */
- class DataTuple<T> implements ApexStreamTuple<T> {
- private int unionTag;
- private T value;
-
- public static <T> DataTuple<T> of(T value) {
- return new DataTuple<>(value, 0);
- }
-
- private DataTuple(T value, int unionTag) {
- this.value = value;
- this.unionTag = unionTag;
- }
-
- @Override
- public T getValue() {
- return value;
- }
-
- public void setValue(T value) {
- this.value = value;
- }
-
- public int getUnionTag() {
- return unionTag;
- }
-
- public void setUnionTag(int unionTag) {
- this.unionTag = unionTag;
- }
-
- @Override
- public String toString() {
- return value.toString();
- }
-
- }
-
- /**
- * Tuple that includes a timestamp.
- *
- * @param <T>
- */
- class TimestampedTuple<T> extends DataTuple<T> {
- private long timestamp;
-
- public TimestampedTuple(long timestamp, T value) {
- super(value, 0);
- this.timestamp = timestamp;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(timestamp);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof TimestampedTuple)) {
- return false;
- } else {
- TimestampedTuple<?> other = (TimestampedTuple<?>) obj;
- return (timestamp == other.timestamp) && Objects.equals(this.getValue(), other.getValue());
- }
- }
-
- }
-
- /**
- * Tuple that represents a watermark.
- *
- * @param <T>
- */
- class WatermarkTuple<T> extends TimestampedTuple<T> {
- public static <T> WatermarkTuple<T> of(long timestamp) {
- return new WatermarkTuple<>(timestamp);
- }
-
- protected WatermarkTuple(long timestamp) {
- super(timestamp, null);
- }
-
- @Override
- public String toString() {
- return "[Watermark " + getTimestamp() + "]";
- }
- }
-
- /**
- * Coder for {@link ApexStreamTuple}.
- */
- class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> {
- private static final long serialVersionUID = 1L;
- final Coder<T> valueCoder;
-
- public static <T> ApexStreamTupleCoder<T> of(Coder<T> valueCoder) {
- return new ApexStreamTupleCoder<>(valueCoder);
- }
-
- protected ApexStreamTupleCoder(Coder<T> valueCoder) {
- this.valueCoder = checkNotNull(valueCoder);
- }
-
- @Override
- public void encode(ApexStreamTuple<T> value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- if (value instanceof WatermarkTuple) {
- outStream.write(1);
- new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>) value).getTimestamp());
- } else {
- outStream.write(0);
- outStream.write(((DataTuple<?>) value).unionTag);
- valueCoder.encode(value.getValue(), outStream, context);
- }
- }
-
- @Override
- public ApexStreamTuple<T> decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- int b = inStream.read();
- if (b == 1) {
- return new WatermarkTuple<>(new DataInputStream(inStream).readLong());
- } else {
- int unionTag = inStream.read();
- return new DataTuple<>(valueCoder.decode(inStream, context), unionTag);
- }
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Arrays.<Coder<?>>asList(valueCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic(
- this.getClass().getSimpleName() + " requires a deterministic valueCoder",
- valueCoder);
- }
-
- /**
- * Returns the value coder.
- */
- public Coder<T> getValueCoder() {
- return valueCoder;
- }
-
- }
-
- /**
- * Central if data tuples received on and emitted from ports should be logged.
- * Should be called in setup and value cached in operator.
- */
- final class Logging {
- public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator) {
- return options.isTupleTracingEnabled();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
deleted file mode 100644
index 61e3b83..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.netlet.util.Slice;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-
-/**
- * The Apex {@link StreamCodec} adapter for using Beam {@link Coder}.
- */
-public class CoderAdapterStreamCodec implements StreamCodec<Object>, Serializable {
- private static final long serialVersionUID = 1L;
- private final Coder<? super Object> coder;
-
- public CoderAdapterStreamCodec(Coder<? super Object> coder) {
- this.coder = coder;
- }
-
- @Override
- public Object fromByteArray(Slice fragment) {
- ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset,
- fragment.length);
- try {
- return coder.decode(bis, Context.OUTER);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Slice toByteArray(Object wv) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try {
- coder.encode(wv, bos, Context.OUTER);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return new Slice(bos.toByteArray());
- }
-
- @Override
- public int getPartition(Object o) {
- return o.hashCode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
deleted file mode 100644
index 3b19c37..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Serializable {@link ExecutionContext.StepContext} that does nothing.
- */
-public class NoOpStepContext implements ExecutionContext.StepContext, Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getStepName() {
- return null;
- }
-
- @Override
- public String getTransformName() {
- return null;
- }
-
- @Override
- public void noteOutput(WindowedValue<?> output) {
- }
-
- @Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
- }
-
- @Override
- public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag,
- Iterable<WindowedValue<T>> data,
- Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws
- IOException {
-
- }
-
- @Override
- public StateInternals<?> stateInternals() {
- return null;
- }
-
- @Override
- public TimerInternals timerInternals() {
- return null;
- }
-}