You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:23 UTC
[23/53] [abbrv] beam git commit: jstorm-runner: move most classes to
translation package and reduece their visibility to package private.
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
new file mode 100644
index 0000000..dab9518
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Values;
+import com.alibaba.jstorm.utils.KryoSerializer;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Spout implementation that wraps a Beam UnboundedSource.
+ * TODO: add wrapper to support metrics in UnboundedSource.
+ */
+public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
+
+ private final String description;
+ private final UnboundedSource source;
+ private final SerializedPipelineOptions serializedOptions;
+ private final TupleTag<?> outputTag;
+
+ private transient JStormPipelineOptions pipelineOptions;
+ private transient UnboundedSource.UnboundedReader reader;
+ private transient SpoutOutputCollector collector;
+
+ private volatile boolean hasNextRecord;
+ private AtomicBoolean activated = new AtomicBoolean();
+
+ private KryoSerializer<WindowedValue> serializer;
+
+ private long lastWaterMark = 0L;
+
+ public UnboundedSourceSpout(
+ String description,
+ UnboundedSource source,
+ JStormPipelineOptions options,
+ TupleTag<?> outputTag) {
+ this.description = checkNotNull(description, "description");
+ this.source = checkNotNull(source, "source");
+ this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
+ this.outputTag = checkNotNull(outputTag, "outputTag");
+ }
+
+ @Override
+ public synchronized void close() {
+ try {
+ activated.set(false);
+ this.reader.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void activate() {
+ activated.set(true);
+
+ }
+
+ @Override
+ public void deactivate() {
+ activated.set(false);
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ try {
+ this.collector = collector;
+ this.pipelineOptions =
+ this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
+
+ createSourceReader(null);
+
+ this.serializer = new KryoSerializer<>(conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create unbounded reader.", e);
+ }
+ }
+
+ public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ reader = this.source.createReader(this.pipelineOptions, checkpointMark);
+ hasNextRecord = this.reader.start();
+ }
+
+ @Override
+ public synchronized void nextTuple() {
+ if (!activated.get()) {
+ return;
+ }
+ try {
+ if (!hasNextRecord) {
+ hasNextRecord = reader.advance();
+ }
+
+ while (hasNextRecord && activated.get()) {
+ Object value = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ WindowedValue wv =
+ WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ LOG.debug("Source output: " + wv.getValue());
+ if (keyedEmit(outputTag.getId())) {
+ KV kv = (KV) wv.getValue();
+ // Convert WindowedValue<KV> to <K, WindowedValue<V>>
+ byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue()));
+ collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue));
+ } else {
+ byte[] immutableValue = serializer.serialize(wv);
+ collector.emit(outputTag.getId(), new Values(immutableValue));
+ }
+
+ // move to next record
+ hasNextRecord = reader.advance();
+ }
+
+ Instant waterMark = reader.getWatermark();
+ if (waterMark != null && lastWaterMark < waterMark.getMillis()) {
+ lastWaterMark = waterMark.getMillis();
+ collector.flush();
+ collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis()));
+ LOG.debug("Source output: WM-{}", waterMark.toDateTime());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Exception reading values from source.", e);
+ }
+ }
+
+ public UnboundedSource getUnboundedSource() {
+ return source;
+ }
+
+ public UnboundedSource.UnboundedReader getUnboundedSourceReader() {
+ return reader;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
new file mode 100644
index 0000000..54c9b94
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Translates a Read.Unbounded into a Storm spout.
+ *
+ * @param <T>
+ */
+class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> {
+ public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description =
+ describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+
+ TupleTag<?> tag = userGraphContext.getOutputTag();
+ PValue output = userGraphContext.getOutput();
+
+ UnboundedSourceSpout spout = new UnboundedSourceSpout(
+ description,
+ transform.getSource(), userGraphContext.getOptions(), tag);
+ context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewExecutor.java
new file mode 100644
index 0000000..822ed8a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewExecutor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * JStorm {@link Executor} for {@link View}.
+ */
+class ViewExecutor implements Executor {
+
+ private final String description;
+ private final TupleTag outputTag;
+ private ExecutorsBolt executorsBolt;
+
+ public ViewExecutor(String description, TupleTag outputTag) {
+ this.description = description;
+ this.outputTag = outputTag;
+ }
+
+ @Override
+ public void init(ExecutorContext context) {
+ this.executorsBolt = context.getExecutorsBolt();
+ }
+
+ @Override
+ public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+ executorsBolt.processExecutorElem(outputTag, elem);
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
new file mode 100644
index 0000000..9ab5784
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+
+/**
+ * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner.
+ */
+class ViewTranslator
+ extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> {
+ @Override
+ public void translateNode(
+ CreateJStormPCollectionView<?, ?> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description = describeTransform(
+ transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+ ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag());
+ context.addTransformExecutor(viewExecutor);
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}.
+ */
+ public static class ViewAsMap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+ @SuppressWarnings("unused") // used via reflection in JstormRunner#apply()
+ public ViewAsMap(View.AsMap<K, V> transform) {
+ }
+
+ @Override
+ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, V>> view =
+ PCollectionViews.mapView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ // TODO: log warning as other runners.
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMap";
+ }
+ }
+
+ /**
+ * Specialized expansion for {@link
+ * View.AsMultimap View.AsMultimap}.
+ */
+ public static class ViewAsMultimap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+ public ViewAsMultimap(View.AsMultimap<K, V> transform) {
+ }
+
+ @Override
+ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, Iterable<V>>> view =
+ PCollectionViews.multimapView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ // TODO: log warning as other runners.
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMultimap";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link View.AsList View.AsList}.
+ */
+ public static class ViewAsList<T>
+ extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+ public ViewAsList(View.AsList<T> transform) {
+ }
+
+ @Override
+ public PCollectionView<List<T>> expand(PCollection<T> input) {
+ PCollectionView<List<T>> view =
+ PCollectionViews.listView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateJStormPCollectionView.<T, List<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsList";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link View.AsIterable View.AsIterable} for the
+ * JStorm runner in streaming mode.
+ */
+ public static class ViewAsIterable<T>
+ extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+ public ViewAsIterable(View.AsIterable<T> transform) {
+ }
+
+ @Override
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+ PCollectionView<Iterable<T>> view =
+ PCollectionViews.iterableView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsIterable";
+ }
+ }
+
+ /**
+ * Specialized expansion for
+ * {@link View.AsSingleton View.AsSingleton} for the
+ * JStorm runner in streaming mode.
+ */
+ public static class ViewAsSingleton<T>
+ extends PTransform<PCollection<T>, PCollectionView<T>> {
+ private View.AsSingleton<T> transform;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+ public ViewAsSingleton(View.AsSingleton<T> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<T> expand(PCollection<T> input) {
+ Combine.Globally<T, T> combine = Combine.globally(
+ new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+ if (!transform.hasDefaultValue()) {
+ combine = combine.withoutDefaults();
+ }
+ return input.apply(combine.asSingletonView());
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsSingleton";
+ }
+
+ private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+ private boolean hasDefaultValue;
+ private T defaultValue;
+
+ SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+ this.hasDefaultValue = hasDefaultValue;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public T apply(T left, T right) {
+ throw new IllegalArgumentException("PCollection with more than one element "
+ + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ + "combine the PCollection into a single value");
+ }
+
+ @Override
+ public T identity() {
+ if (hasDefaultValue) {
+ return defaultValue;
+ } else {
+ throw new IllegalArgumentException(
+ "Empty PCollection accessed as a singleton view. "
+ + "Consider setting withDefault to provide a default value");
+ }
+ }
+ }
+ }
+
+ /**
+ * Specialized expansion for
+ * {@link org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView}.
+ * @param <InputT>
+ * @param <OutputT>
+ */
+ public static class CombineGloballyAsSingletonView<InputT, OutputT>
+ extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public CombineGloballyAsSingletonView(
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+ PCollection<OutputT> combined =
+ input.apply(Combine.globally(transform.getCombineFn())
+ .withoutDefaults()
+ .withFanout(transform.getFanout()));
+
+ PCollectionView<OutputT> view = PCollectionViews.singletonView(
+ combined,
+ combined.getWindowingStrategy(),
+ transform.getInsertDefault(),
+ transform.getInsertDefault()
+ ? transform.getCombineFn().defaultValue() : null,
+ combined.getCoder());
+ return combined
+ .apply(ParDo.of(new WrapAsList<OutputT>()))
+ .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingCombineGloballyAsSingletonView";
+ }
+ }
+
+ private static class WrapAsList<T> extends DoFn<T, List<T>> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(Collections.singletonList(c.element()));
+ }
+ }
+
+ /**
+ * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+ * They require the input {@link PCollection} fits in memory.
+ * For a large {@link PCollection} this is expected to crash!
+ *
+ * @param <T> the type of elements to concatenate.
+ */
+ private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> result = createAccumulator();
+ for (List<T> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
+ }
+
+ @Override
+ public List<T> extractOutput(List<T> accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+
+ @Override
+ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+ }
+
+ /**
+ * Creates a primitive {@link PCollectionView}.
+ * For internal use only by runner implementors.
+ *
+ * @param <ElemT> The type of the elements of the input PCollection
+ * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+ */
+ public static class CreateJStormPCollectionView<ElemT, ViewT>
+ extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+ private PCollectionView<ViewT> view;
+
+ private CreateJStormPCollectionView(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
+
+ public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of(
+ PCollectionView<ViewT> view) {
+ return new CreateJStormPCollectionView<>(view);
+ }
+
+ @Override
+ public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+ return view;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
new file mode 100644
index 0000000..8d60392
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
+ * @param <T>
+ * @param <W>
+ */
+class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
+ private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
+
+ private final String description;
+ private WindowFn<T, W> windowFn;
+ private ExecutorsBolt executorsBolt;
+ private TupleTag outputTag;
+
+ class JStormAssignContext<InputT, W extends BoundedWindow>
+ extends WindowFn<InputT, W>.AssignContext {
+ private final WindowedValue<InputT> value;
+
+ JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+ fn.super();
+ checkArgument(
+ Iterables.size(value.getWindows()) == 1,
+ String.format(
+ "%s passed to window assignment must be in a single window, but it was in %s: %s",
+ WindowedValue.class.getSimpleName(),
+ Iterables.size(value.getWindows()),
+ value.getWindows()));
+ this.value = value;
+ }
+
+ @Override
+ public InputT element() {
+ return value.getValue();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return value.getTimestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return Iterables.getOnlyElement(value.getWindows());
+ }
+ }
+
+ public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) {
+ this.description = description;
+ this.windowFn = windowFn;
+ this.outputTag = outputTag;
+ }
+
+ @Override
+ public void init(ExecutorContext context) {
+ this.executorsBolt = context.getExecutorsBolt();
+ }
+
+ @Override
+ public void process(TupleTag tag, WindowedValue elem) {
+ Collection<W> windows = null;
+ try {
+ windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem));
+ for (W window : windows) {
+ executorsBolt.processExecutorElem(
+ outputTag,
+ WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to assign windows for elem=" + elem, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignTranslator.java
new file mode 100644
index 0000000..86cb638
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignTranslator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+/**
+ * Translates a {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} to a
+ * JStorm {@link WindowAssignExecutor}.
+ * @param <T>
+ */
+class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
+
+ @Override
+ public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description =
+ describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+ context.getUserGraphContext().setWindowed();
+ WindowAssignExecutor executor = new WindowAssignExecutor(
+ description,
+ transform.getWindowFn(),
+ userGraphContext.getOutputTag());
+ context.addTransformExecutor(executor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/package-info.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/package-info.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/package-info.java
new file mode 100644
index 0000000..f8f2f3f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the translation to JStorm topology.
+ */
+package org.apache.beam.runners.jstorm.translation;
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
deleted file mode 100644
index 3d7fab8..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-
-/**
- * Enable user to add output stream definitions by API, rather than hard-code.
- */
-public abstract class AbstractComponent implements IComponent {
- private Map<String, Fields> streamToFields = new HashMap<>();
- private Map<String, Boolean> keyStreams = new HashMap<>();
- private int parallelismNum = 0;
-
- public void addOutputField(String streamId) {
- addOutputField(streamId, new Fields(CommonInstance.VALUE));
- }
-
- public void addOutputField(String streamId, Fields fields) {
- streamToFields.put(streamId, fields);
- keyStreams.put(streamId, false);
- }
-
- public void addKVOutputField(String streamId) {
- streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE));
- keyStreams.put(streamId, true);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) {
- declarer.declareStream(entry.getKey(), entry.getValue());
- }
- }
-
- public boolean keyedEmit(String streamId) {
- Boolean isKeyedStream = keyStreams.get(streamId);
- return isKeyedStream == null ? false : isKeyedStream;
- }
-
- public int getParallelismNum() {
- return parallelismNum;
- }
-
- public void setParallelismNum(int num) {
- parallelismNum = num;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
deleted file mode 100644
index e07d890..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.metric.MetricClient;
-import com.google.common.collect.Iterables;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.SideInputHandler;
-import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.runners.jstorm.translation.util.DefaultStepContext;
-import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * JStorm {@link Executor} for {@link DoFn}.
- * @param <InputT> input type
- * @param <OutputT> output type
- */
-public class DoFnExecutor<InputT, OutputT> implements Executor {
- private static final long serialVersionUID = 5297603063991078668L;
-
- private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
-
- /**
- * Implements {@link OutputManager} in a DoFn executor.
- */
- public class DoFnExecutorOutputManager implements OutputManager, Serializable {
- private static final long serialVersionUID = -661113364735206170L;
-
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- executorsBolt.processExecutorElem(tag, output);
- }
- }
-
- protected transient DoFnRunner<InputT, OutputT> runner = null;
- protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null;
-
- protected final String stepName;
-
- protected int internalDoFnExecutorId;
-
- protected final String description;
-
- protected final TupleTag<OutputT> mainTupleTag;
- protected final List<TupleTag<?>> sideOutputTags;
-
- protected SerializedPipelineOptions serializedOptions;
- protected transient JStormPipelineOptions pipelineOptions;
-
- protected DoFn<InputT, OutputT> doFn;
- protected final Coder<WindowedValue<InputT>> inputCoder;
- protected DoFnInvoker<InputT, OutputT> doFnInvoker;
- protected OutputManager outputManager;
- protected WindowingStrategy<?, ?> windowingStrategy;
- protected final TupleTag<InputT> mainInputTag;
- protected Collection<PCollectionView<?>> sideInputs;
- protected SideInputHandler sideInputHandler;
- protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
-
- // Initialize during runtime
- protected ExecutorContext executorContext;
- protected ExecutorsBolt executorsBolt;
- protected TimerInternals timerInternals;
- protected transient StateInternals pushbackStateInternals;
- protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
- protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
- protected transient IKvStoreManager kvStoreManager;
- protected DefaultStepContext stepContext;
- protected transient MetricClient metricClient;
-
- public DoFnExecutor(
- String stepName,
- String description,
- JStormPipelineOptions pipelineOptions,
- DoFn<InputT, OutputT> doFn,
- Coder<WindowedValue<InputT>> inputCoder,
- WindowingStrategy<?, ?> windowingStrategy,
- TupleTag<InputT> mainInputTag,
- Collection<PCollectionView<?>> sideInputs,
- Map<TupleTag, PCollectionView<?>> sideInputTagToView,
- TupleTag<OutputT> mainTupleTag,
- List<TupleTag<?>> sideOutputTags) {
- this.stepName = checkNotNull(stepName, "stepName");
- this.description = checkNotNull(description, "description");
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
- this.doFn = doFn;
- this.inputCoder = inputCoder;
- this.outputManager = new DoFnExecutorOutputManager();
- this.windowingStrategy = windowingStrategy;
- this.mainInputTag = mainInputTag;
- this.sideInputs = sideInputs;
- this.mainTupleTag = mainTupleTag;
- this.sideOutputTags = sideOutputTags;
- this.sideInputTagToView = sideInputTagToView;
- }
-
- protected DoFnRunner<InputT, OutputT> getDoFnRunner() {
- return new DoFnRunnerWithMetrics<>(
- stepName,
- DoFnRunners.simpleRunner(
- this.pipelineOptions,
- this.doFn,
- this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler,
- this.outputManager,
- this.mainTupleTag,
- this.sideOutputTags,
- this.stepContext,
- this.windowingStrategy),
- MetricsReporter.create(metricClient));
- }
-
- protected void initService(ExecutorContext context) {
- // TODO: what should be set for key in here?
- timerInternals = new JStormTimerInternals(
- null /* key */, this, context.getExecutorsBolt().timerService());
- kvStoreManager = context.getKvStoreManager();
- stepContext = new DefaultStepContext(timerInternals,
- new JStormStateInternals(
- null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- metricClient = new MetricClient(executorContext.getTopologyContext());
- }
-
- @Override
- public void init(ExecutorContext context) {
- this.executorContext = context;
- this.executorsBolt = context.getExecutorsBolt();
- this.pipelineOptions =
- this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
-
- initService(context);
-
- // Side inputs setup
- if (sideInputs != null && !sideInputs.isEmpty()) {
- pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
- watermarkHoldTag =
- StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
- pushbackStateInternals = new JStormStateInternals(
- null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
- sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals);
- runner = getDoFnRunner();
- pushbackRunner =
- SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler);
- } else {
- runner = getDoFnRunner();
- }
-
- // Process user's setup
- doFnInvoker = DoFnInvokers.invokerFor(doFn);
- doFnInvoker.invokeSetup();
- }
-
- @Override
- public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
- LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
- tag, mainInputTag, sideInputs, elem.getValue()));
- if (mainInputTag.equals(tag)) {
- processMainInput(elem);
- } else {
- processSideInput(tag, elem);
- }
- }
-
- protected <T> void processMainInput(WindowedValue<T> elem) {
- if (sideInputs.isEmpty()) {
- runner.processElement((WindowedValue<InputT>) elem);
- } else {
- Iterable<WindowedValue<InputT>> justPushedBack =
- pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem);
- BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
- Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
- for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
- if (pushedBackValue.getTimestamp().isBefore(min)) {
- min = pushedBackValue.getTimestamp();
- }
- min = earlier(min, pushedBackValue.getTimestamp());
- pushedBack.add(pushedBackValue);
- }
- pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min);
- }
- }
-
- protected void processSideInput(TupleTag tag, WindowedValue elem) {
- LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
-
- PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
- sideInputHandler.addSideInputValue(sideInputView, elem);
-
- BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
- List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
-
- Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read();
- if (pushedBackInputs != null) {
- for (WindowedValue<InputT> input : pushedBackInputs) {
-
- Iterable<WindowedValue<InputT>> justPushedBack =
- pushbackRunner.processElementInReadyWindows(input);
- Iterables.addAll(newPushedBack, justPushedBack);
- }
- }
- pushedBack.clear();
-
- Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
- for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
- min = earlier(min, pushedBackValue.getTimestamp());
- pushedBack.add(pushedBackValue);
- }
-
- WatermarkHoldState watermarkHold =
- pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
- // TODO: clear-then-add is not thread-safe.
- watermarkHold.clear();
- watermarkHold.add(min);
- }
-
- /**
- * Process all pushed back elements when receiving watermark with max timestamp.
- */
- public void processAllPushBackElements() {
- if (sideInputs != null && !sideInputs.isEmpty()) {
- BagState<WindowedValue<InputT>> pushedBackElements =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
- if (pushedBackElements != null) {
- for (WindowedValue<InputT> elem : pushedBackElements.read()) {
- LOG.info("Process pushback elem={}", elem);
- runner.processElement(elem);
- }
- pushedBackElements.clear();
- }
-
- WatermarkHoldState watermarkHold =
- pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
- watermarkHold.clear();
- watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE);
- }
- }
-
- public void onTimer(Object key, TimerInternals.TimerData timerData) {
- StateNamespace namespace = timerData.getNamespace();
- checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
- BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
- if (pushbackRunner != null) {
- pushbackRunner.onTimer(
- timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
- } else {
- runner.onTimer(
- timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
- }
- }
-
- @Override
- public void cleanup() {
- doFnInvoker.invokeTeardown();
- }
-
- @Override
- public String toString() {
- return description;
- }
-
- private Instant earlier(Instant left, Instant right) {
- return left.isBefore(right) ? left : right;
- }
-
- public void startBundle() {
- if (pushbackRunner != null) {
- pushbackRunner.startBundle();
- } else {
- runner.startBundle();
- }
- }
-
- public void finishBundle() {
- if (pushbackRunner != null) {
- pushbackRunner.finishBundle();
- } else {
- runner.finishBundle();
- }
- }
-
- public void setInternalDoFnExecutorId(int id) {
- this.internalDoFnExecutorId = id;
- }
-
- public int getInternalDoFnExecutorId() {
- return internalDoFnExecutorId;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
deleted file mode 100644
index 1610a8a..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Closeable;
-import java.io.IOException;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
-
-/**
- * DoFnRunner decorator which registers {@link MetricsContainer}.
- */
-public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-
- private final String stepName;
- private final DoFnRunner<InputT, OutputT> delegate;
- private final MetricsReporter metricsReporter;
-
- DoFnRunnerWithMetrics(
- String stepName,
- DoFnRunner<InputT, OutputT> delegate,
- MetricsReporter metricsReporter) {
- this.stepName = checkNotNull(stepName, "stepName");
- this.delegate = checkNotNull(delegate, "delegate");
- this.metricsReporter = checkNotNull(metricsReporter, "metricsReporter");
- }
-
- @Override
- public void startBundle() {
- try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
- metricsReporter.getMetricsContainer(stepName))) {
- delegate.startBundle();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void processElement(WindowedValue<InputT> elem) {
- try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
- metricsReporter.getMetricsContainer(stepName))) {
- delegate.processElement(elem);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
- try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
- metricsReporter.getMetricsContainer(stepName))) {
- delegate.onTimer(timerId, window, timestamp, timeDomain);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void finishBundle() {
- try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
- metricsReporter.getMetricsContainer(stepName))) {
- delegate.finishBundle();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- metricsReporter.updateMetrics();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
deleted file mode 100644
index 0ec4fdd..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * An executor is a basic executable unit in a JStorm task.
- */
-public interface Executor extends Serializable {
- /**
- * Initialization during runtime.
- */
- void init(ExecutorContext context);
-
- <T> void process(TupleTag<T> tag, WindowedValue<T> elem);
-
- void cleanup();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
deleted file mode 100644
index 55ca171..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import backtype.storm.task.TopologyContext;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.google.auto.value.AutoValue;
-
-/**
- * Context of a executors bolt when runtime.
- */
-@AutoValue
-public abstract class ExecutorContext {
- public static ExecutorContext of(
- TopologyContext topologyContext,
- ExecutorsBolt bolt,
- IKvStoreManager kvStoreManager) {
- return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager);
- }
-
- public abstract TopologyContext getTopologyContext();
-
- public abstract ExecutorsBolt getExecutorsBolt();
-
- public abstract IKvStoreManager getKvStoreManager();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
deleted file mode 100644
index 0366c13..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBatchBolt;
-import backtype.storm.tuple.ITupleExt;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.cache.KvStoreManagerFactory;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.utils.KryoSerializer;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * ExecutorsBolt is a JStorm Bolt composited with several executors chained in a sub-DAG.
- */
-public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
- private static final long serialVersionUID = -7751043327801735211L;
-
- private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
-
- protected ExecutorContext executorContext;
-
- protected TimerService timerService;
-
- // map from input tag to executor inside bolt
- protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
- // set of all output tags that will be emit outside bolt
- protected final Set<TupleTag> outputTags = Sets.newHashSet();
- protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
- protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet();
- protected int internalDoFnExecutorId = 1;
- protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
-
- protected OutputCollector collector;
-
- protected boolean isStatefulBolt = false;
-
- protected KryoSerializer<WindowedValue> serializer;
-
- public ExecutorsBolt() {
-
- }
-
- public void setStatefulBolt(boolean isStateful) {
- isStatefulBolt = isStateful;
- }
-
- public void addExecutor(TupleTag inputTag, Executor executor) {
- inputTagToExecutor.put(
- checkNotNull(inputTag, "inputTag"),
- checkNotNull(executor, "executor"));
- }
-
- public Map<TupleTag, Executor> getExecutors() {
- return inputTagToExecutor;
- }
-
- public void registerExecutor(Executor executor) {
- if (executor instanceof DoFnExecutor) {
- DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
- idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor);
- doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId);
- internalDoFnExecutorId++;
- }
- }
-
- public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() {
- return idToDoFnExecutor;
- }
-
- public void addOutputTags(TupleTag tag) {
- outputTags.add(tag);
- }
-
- public void addExternalOutputTag(TupleTag<?> tag) {
- externalOutputTags.add(tag);
- }
-
- public Set<TupleTag> getOutputTags() {
- return outputTags;
- }
-
- public ExecutorContext getExecutorContext() {
- return executorContext;
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- LOG.info("Start to prepare for task-{}", context.getThisTaskId());
- try {
- this.collector = collector;
-
- // init kv store manager
- String storeName = String.format("task-%d", context.getThisTaskId());
- String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
- IKvStoreManager kvStoreManager = isStatefulBolt
- ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(
- context, storeName, stateStorePath, isStatefulBolt)
- : KvStoreManagerFactory.getKvStoreManager(
- stormConf, storeName, stateStorePath, isStatefulBolt);
- this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
-
- // init time service
- timerService = initTimerService();
-
- // init all internal executors
- for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
- executor.init(executorContext);
- if (executor instanceof DoFnExecutor) {
- doFnExecutors.add((DoFnExecutor) executor);
- }
- }
-
- this.serializer = new KryoSerializer<WindowedValue>(stormConf);
-
- LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values());
- LOG.info("inputTagToExecutor={}", inputTagToExecutor);
- LOG.info("outputTags={}", outputTags);
- LOG.info("externalOutputTags={}", externalOutputTags);
- LOG.info("doFnExecutors={}", doFnExecutors);
- } catch (IOException e) {
- throw new RuntimeException("Failed to prepare executors bolt", e);
- }
- }
-
- public TimerService initTimerService() {
- TopologyContext context = executorContext.getTopologyContext();
- List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet())
- .transformAndConcat(
- new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() {
- @Override
- public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) {
- if (Common.isSystemComponent(value.getKey())) {
- return Collections.EMPTY_LIST;
- } else {
- return value.getValue();
- }
- }
- })
- .toList();
- TimerService ret = new TimerServiceImpl(executorContext);
- ret.init(tasks);
- return ret;
- }
-
- @Override
- public void execute(Tuple input) {
- // process a batch
- String streamId = input.getSourceStreamId();
- ITupleExt tuple = (ITupleExt) input;
- Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
- if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
- while (valueIterator.hasNext()) {
- processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
- }
- } else {
- doFnStartBundle();
- while (valueIterator.hasNext()) {
- processElement(valueIterator.next(), streamId);
- }
- doFnFinishBundle();
- }
- }
-
- private void processWatermark(long watermarkTs, int sourceTask) {
- long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs);
- LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}",
- (new Instant(watermarkTs)).toDateTime(),
- sourceTask,
- (new Instant(newWaterMark)).toDateTime());
- if (newWaterMark != 0) {
- // Some buffer windows are going to be triggered.
- doFnStartBundle();
- timerService.fireTimers(newWaterMark);
-
- // SideInput: If receiving water mark with max timestamp, It means no more data is supposed
- // to be received from now on. So we are going to process all push back data.
- if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
- for (DoFnExecutor doFnExecutor : doFnExecutors) {
- doFnExecutor.processAllPushBackElements();
- }
- }
-
- doFnFinishBundle();
- }
-
- long currentWaterMark = timerService.currentOutputWatermark();
- if (!externalOutputTags.isEmpty()) {
- collector.flush();
- collector.emit(
- CommonInstance.BEAM_WATERMARK_STREAM_ID,
- new Values(currentWaterMark));
- LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime());
- }
- }
-
- private void processElement(List<Object> values, String streamId) {
- TupleTag inputTag = new TupleTag(streamId);
- WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values);
- processExecutorElem(inputTag, windowedValue);
- }
-
- public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
- LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
- if (elem != null) {
- Executor executor = inputTagToExecutor.get(inputTag);
- if (executor != null) {
- executor.process(inputTag, elem);
- }
- if (externalOutputTags.contains(inputTag)) {
- emitOutsideBolt(inputTag, elem);
- }
- } else {
- LOG.info("Received null elem for tag={}", inputTag);
- }
- }
-
- @Override
- public void cleanup() {
- for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
- executor.cleanup();
- }
- executorContext.getKvStoreManager().close();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- public TimerService timerService() {
- return timerService;
- }
-
- public void setTimerService(TimerService service) {
- timerService = service;
- }
-
- private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) {
- WindowedValue wv = null;
- if (values.size() > 1) {
- Object key = values.get(0);
- WindowedValue value = serializer.deserialize((byte[]) values.get(1));
- wv = value.withValue(KV.of(key, value.getValue()));
- } else {
- wv = serializer.deserialize((byte[]) values.get(0));
- }
- return wv;
- }
-
- protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) {
- LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue());
- if (keyedEmit(outputTag.getId())) {
- KV kv = (KV) outputValue.getValue();
- byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue()));
- // Convert WindowedValue<KV> to <K, WindowedValue<V>>
- if (kv.getKey() == null) {
- // If key is null, emit "null" string here. Because, null value will be ignored in JStorm.
- collector.emit(outputTag.getId(), new Values("null", immutableOutputValue));
- } else {
- collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue));
- }
- } else {
- byte[] immutableOutputValue = serializer.serialize(outputValue);
- collector.emit(outputTag.getId(), new Values(immutableOutputValue));
- }
- }
-
- private void doFnStartBundle() {
- for (DoFnExecutor doFnExecutor : doFnExecutors) {
- doFnExecutor.startBundle();
- }
- }
-
- private void doFnFinishBundle() {
- for (DoFnExecutor doFnExecutor : doFnExecutors) {
- doFnExecutor.finishBundle();
- }
- }
-
- @Override
- public String toString() {
- // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString());
- List<String> ret = new ArrayList<>();
- /*ret.add("inputTags");
- for (TupleTag inputTag : inputTagToExecutor.keySet()) {
- ret.add(inputTag.getId());
- }*/
- ret.add("internalExecutors");
- for (Executor executor : inputTagToExecutor.values()) {
- ret.add(executor.toString());
- }
- ret.add("externalOutputTags");
- for (TupleTag output : externalOutputTags) {
- ret.add(output.getId());
- }
- return Joiner.on('\n').join(ret).concat("\n");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
deleted file mode 100644
index caf1e47..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.Flatten}.
- * @param <InputT>
- */
-public class FlattenExecutor<InputT> implements Executor {
-
- private final String description;
- private TupleTag mainOutputTag;
- private ExecutorContext context;
- private ExecutorsBolt executorsBolt;
-
- public FlattenExecutor(String description, TupleTag mainTupleTag) {
- this.description = checkNotNull(description, "description");
- this.mainOutputTag = mainTupleTag;
- }
-
- @Override
- public void init(ExecutorContext context) {
- this.context = context;
- this.executorsBolt = context.getExecutorsBolt();
- }
-
- @Override
- public void process(TupleTag tag, WindowedValue elem) {
- executorsBolt.processExecutorElem(mainOutputTag, elem);
- }
-
- @Override
- public void cleanup() {
- }
-
- @Override
- public String toString() {
- return description;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
deleted file mode 100644
index 0dd1af9..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.ImmutableList;
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsFactory;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext;
-import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.runners.jstorm.util.RunnerUtils;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.GroupByKey}.
- * @param <K>
- * @param <V>
- */
-public class GroupByWindowExecutor<K, V>
- extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
- private static final long serialVersionUID = -7563050475488610553L;
-
- private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
-
- private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
-
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- executorsBolt.processExecutorElem(tag, output);
- }
- }
-
- private KvCoder<K, V> inputKvCoder;
- private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
-
- public GroupByWindowExecutor(
- String stepName,
- String description,
- TranslationContext context,
- JStormPipelineOptions pipelineOptions,
- WindowingStrategy<?, ?> windowingStrategy,
- TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
- // The doFn will be created when runtime. Just pass "null" here
- super(
- stepName,
- description,
- pipelineOptions,
- null,
- null,
- windowingStrategy,
- null,
- null,
- null,
- mainTupleTag,
- sideOutputTags);
-
- this.outputManager = new GroupByWindowOutputManager();
- UserGraphContext userGraphContext = context.getUserGraphContext();
- PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
- this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
- }
-
- private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() {
- final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() {
- @Override
- public StateInternals stateInternalsForKey(K key) {
- return new JStormStateInternals<K>(
- key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
- }
- };
- TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() {
- @Override
- public TimerInternals timerInternalsForKey(K key) {
- return new JStormTimerInternals<>(
- key,
- GroupByWindowExecutor.this,
- executorContext.getExecutorsBolt().timerService());
- }
- };
-
- reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder());
- DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn =
- GroupAlsoByWindowViaWindowSetNewDoFn.create(
- windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(),
- (SystemReduceFn) reduceFn, outputManager, mainTupleTag);
- return doFn;
- }
-
- @Override
- protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() {
- doFn = getGroupByWindowDoFn();
-
- DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.simpleRunner(
- this.pipelineOptions,
- this.doFn,
- NullSideInputReader.empty(),
- this.outputManager,
- this.mainTupleTag,
- this.sideOutputTags,
- this.stepContext,
- this.windowingStrategy);
-
- DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner =
- DoFnRunners.lateDataDroppingRunner(
- simpleRunner,
- this.stepContext,
- this.windowingStrategy);
- return new DoFnRunnerWithMetrics<>(
- stepName, doFnRunner, MetricsReporter.create(metricClient));
- }
-
- @Override
- public void process(TupleTag tag, WindowedValue elem) {
- /**
- * For GroupByKey, KV type elem is received. We need to convert the KV elem
- * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner.
- */
- KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem);
- runner.processElement(elem.withValue(keyedWorkItem));
- }
-
- @Override
- public void onTimer(Object key, TimerInternals.TimerData timerData) {
- StateNamespace namespace = timerData.getNamespace();
- checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
-
- runner.processElement(
- WindowedValue.valueInGlobalWindow(
- KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData))));
- }
-
- @Override
- public String toString() {
- return super.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java
deleted file mode 100644
index a022440..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
-
-import com.alibaba.jstorm.common.metric.AsmCounter;
-import com.alibaba.jstorm.metric.MetricClient;
-import com.google.common.collect.Maps;
-import java.util.Map;
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsFilter;
-
-/**
- * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to JStorm engine.
- */
-public class MetricsReporter {
-
- private static final String METRIC_KEY_SEPARATOR = "__";
- private static final String COUNTER_PREFIX = "__counter";
-
- private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
- private final Map<String, Long> reportedCounters = Maps.newHashMap();
- private final MetricClient metricClient;
-
- public static MetricsReporter create(MetricClient metricClient) {
- return new MetricsReporter(metricClient);
- }
-
- private MetricsReporter(MetricClient metricClient) {
- this.metricClient = checkNotNull(metricClient, "metricClient");
- }
-
- public MetricsContainer getMetricsContainer(String stepName) {
- return metricsContainers.getContainer(stepName);
- }
-
- public void updateMetrics() {
- MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
- MetricQueryResults metricQueryResults =
- metricResults.queryMetrics(MetricsFilter.builder().build());
- updateCounters(metricQueryResults.counters());
- }
-
- private void updateCounters(Iterable<MetricResult<Long>> counters) {
- System.out.print("updateCounters");
- for (MetricResult<Long> metricResult : counters) {
- String metricName = getMetricNameString(COUNTER_PREFIX, metricResult);
- System.out.print("metricName: " + metricName);
- Long updateValue = metricResult.attempted();
- Long oldValue = reportedCounters.get(metricName);
-
- if (oldValue == null || oldValue < updateValue) {
- AsmCounter counter = metricClient.registerCounter(metricName);
- Long incValue = (oldValue == null ? updateValue : updateValue - oldValue);
- counter.update(incValue);
- }
- }
- }
-
- private String getMetricNameString(String prefix, MetricResult<?> metricResult) {
- return prefix
- + METRIC_KEY_SEPARATOR + metricResult.step()
- + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
- + METRIC_KEY_SEPARATOR + metricResult.name().name();
- }
-}