You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:25 UTC
[25/53] [abbrv] beam git commit: jstorm-runner: move most classes to
translation package and reduece their visibility to package private.
jstorm-runner: move most classes to translation package and reduece their visibility to package private.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82653534
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82653534
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82653534
Branch: refs/heads/jstorm-runner
Commit: 82653534b0b738ee84ed94a67f9344393778d033
Parents: 9309ac4
Author: Pei He <pe...@apache.org>
Authored: Fri Jul 14 15:28:53 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:57 2017 +0800
----------------------------------------------------------------------
.../beam/runners/jstorm/JStormRunner.java | 14 +-
.../jstorm/serialization/package-info.java | 22 ++
.../jstorm/translation/AbstractComponent.java | 67 ++++
.../translation/BoundedSourceTranslator.java | 49 +++
.../jstorm/translation/CommonInstance.java | 28 ++
.../jstorm/translation/DefaultStepContext.java | 90 +++++
.../jstorm/translation/DoFnExecutor.java | 339 +++++++++++++++++
.../translation/DoFnRunnerWithMetrics.java | 91 +++++
.../runners/jstorm/translation/Executor.java | 36 ++
.../jstorm/translation/ExecutorContext.java | 41 ++
.../jstorm/translation/ExecutorsBolt.java | 338 +++++++++++++++++
.../jstorm/translation/FlattenExecutor.java | 60 +++
.../jstorm/translation/FlattenTranslator.java | 49 +++
.../translation/GroupByKeyTranslator.java | 71 ++++
.../translation/GroupByWindowExecutor.java | 173 +++++++++
.../jstorm/translation/JStormBagState.java | 180 +++++++++
.../translation/JStormCombiningState.java | 88 +++++
.../jstorm/translation/JStormMapState.java | 158 ++++++++
.../translation/JStormPipelineTranslator.java | 2 -
.../translation/JStormStateInternals.java | 190 ++++++++++
.../translation/JStormTimerInternals.java | 97 +++++
.../jstorm/translation/JStormValueState.java | 82 ++++
.../translation/JStormWatermarkHoldState.java | 82 ++++
.../jstorm/translation/MetricsReporter.java | 87 +++++
.../translation/MultiOutputDoFnExecutor.java | 79 ++++
.../translation/MultiStatefulDoFnExecutor.java | 70 ++++
.../translation/ParDoBoundMultiTranslator.java | 114 ++++++
.../translation/ParDoBoundTranslator.java | 107 ++++++
.../runners/jstorm/translation/RunnerUtils.java | 51 +++
.../translation/SerializedPipelineOptions.java | 65 ++++
.../translation/SingletonKeyedWorkItem.java | 62 +++
.../translation/StatefulDoFnExecutor.java | 68 ++++
.../beam/runners/jstorm/translation/Stream.java | 104 +++++
.../jstorm/translation/TimerService.java | 51 +++
.../jstorm/translation/TimerServiceImpl.java | 155 ++++++++
.../jstorm/translation/TransformTranslator.java | 79 ++++
.../jstorm/translation/TranslationContext.java | 6 -
.../jstorm/translation/TranslatorRegistry.java | 11 +-
.../jstorm/translation/TxExecutorsBolt.java | 133 +++++++
.../translation/TxUnboundedSourceSpout.java | 156 ++++++++
.../translation/UnboundedSourceSpout.java | 189 +++++++++
.../translation/UnboundedSourceTranslator.java | 44 +++
.../jstorm/translation/ViewExecutor.java | 56 +++
.../jstorm/translation/ViewTranslator.java | 378 ++++++++++++++++++
.../translation/WindowAssignExecutor.java | 112 ++++++
.../translation/WindowAssignTranslator.java | 41 ++
.../jstorm/translation/package-info.java | 22 ++
.../translation/runtime/AbstractComponent.java | 68 ----
.../translation/runtime/DoFnExecutor.java | 343 -----------------
.../runtime/DoFnRunnerWithMetrics.java | 91 -----
.../jstorm/translation/runtime/Executor.java | 36 --
.../translation/runtime/ExecutorContext.java | 41 --
.../translation/runtime/ExecutorsBolt.java | 339 -----------------
.../translation/runtime/FlattenExecutor.java | 60 ---
.../runtime/GroupByWindowExecutor.java | 177 ---------
.../translation/runtime/MetricsReporter.java | 87 -----
.../runtime/MultiOutputDoFnExecutor.java | 79 ----
.../runtime/MultiStatefulDoFnExecutor.java | 72 ----
.../runtime/StatefulDoFnExecutor.java | 70 ----
.../translation/runtime/TimerService.java | 51 ---
.../translation/runtime/TimerServiceImpl.java | 155 --------
.../translation/runtime/TxExecutorsBolt.java | 133 -------
.../runtime/TxUnboundedSourceSpout.java | 156 --------
.../runtime/UnboundedSourceSpout.java | 191 ----------
.../translation/runtime/ViewExecutor.java | 56 ---
.../runtime/WindowAssignExecutor.java | 112 ------
.../runtime/state/JStormBagState.java | 180 ---------
.../runtime/state/JStormCombiningState.java | 88 -----
.../runtime/state/JStormMapState.java | 158 --------
.../runtime/state/JStormStateInternals.java | 191 ----------
.../runtime/state/JStormValueState.java | 82 ----
.../runtime/state/JStormWatermarkHoldState.java | 83 ----
.../runtime/timer/JStormTimerInternals.java | 100 -----
.../translator/BoundedSourceTranslator.java | 51 ---
.../translator/FlattenTranslator.java | 51 ---
.../translator/GroupByKeyTranslator.java | 73 ----
.../translator/ParDoBoundMultiTranslator.java | 118 ------
.../translator/ParDoBoundTranslator.java | 110 ------
.../jstorm/translation/translator/Stream.java | 104 -----
.../translator/TransformTranslator.java | 80 ----
.../translator/UnboundedSourceTranslator.java | 46 ---
.../translation/translator/ViewTranslator.java | 380 -------------------
.../translator/WindowAssignTranslator.java | 43 ---
.../jstorm/translation/util/CommonInstance.java | 28 --
.../translation/util/DefaultStepContext.java | 90 -----
.../beam/runners/jstorm/util/RunnerUtils.java | 55 ---
.../jstorm/util/SerializedPipelineOptions.java | 65 ----
.../jstorm/util/SingletonKeyedWorkItem.java | 62 ---
.../translation/JStormStateInternalsTest.java | 221 +++++++++++
.../runtime/state/JStormStateInternalsTest.java | 222 -----------
90 files changed, 4783 insertions(+), 4802 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 8782130..baf4e5a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -38,15 +38,15 @@ import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer;
import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer;
import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer;
import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerializer;
+import org.apache.beam.runners.jstorm.translation.AbstractComponent;
+import org.apache.beam.runners.jstorm.translation.CommonInstance;
+import org.apache.beam.runners.jstorm.translation.ExecutorsBolt;
import org.apache.beam.runners.jstorm.translation.JStormPipelineTranslator;
+import org.apache.beam.runners.jstorm.translation.Stream;
import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.AbstractComponent;
-import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
-import org.apache.beam.runners.jstorm.translation.runtime.TxExecutorsBolt;
-import org.apache.beam.runners.jstorm.translation.runtime.TxUnboundedSourceSpout;
-import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
-import org.apache.beam.runners.jstorm.translation.translator.Stream;
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import org.apache.beam.runners.jstorm.translation.TxExecutorsBolt;
+import org.apache.beam.runners.jstorm.translation.TxUnboundedSourceSpout;
+import org.apache.beam.runners.jstorm.translation.UnboundedSourceSpout;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/package-info.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/package-info.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/package-info.java
new file mode 100644
index 0000000..f5ac931
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of kryo serializers.
+ */
+package org.apache.beam.runners.jstorm.serialization;
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/AbstractComponent.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/AbstractComponent.java
new file mode 100644
index 0000000..35ae88d
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/AbstractComponent.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import backtype.storm.topology.IComponent;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Enable user to add output stream definitions by API, rather than hard-code.
+ */
+public abstract class AbstractComponent implements IComponent {
+ private Map<String, Fields> streamToFields = new HashMap<>();
+ private Map<String, Boolean> keyStreams = new HashMap<>();
+ private int parallelismNum = 0;
+
+ public void addOutputField(String streamId) {
+ addOutputField(streamId, new Fields(CommonInstance.VALUE));
+ }
+
+ public void addOutputField(String streamId, Fields fields) {
+ streamToFields.put(streamId, fields);
+ keyStreams.put(streamId, false);
+ }
+
+ public void addKVOutputField(String streamId) {
+ streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE));
+ keyStreams.put(streamId, true);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) {
+ declarer.declareStream(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public boolean keyedEmit(String streamId) {
+ Boolean isKeyedStream = keyStreams.get(streamId);
+ return isKeyedStream == null ? false : isKeyedStream;
+ }
+
+ public int getParallelismNum() {
+ return parallelismNum;
+ }
+
+ public void setParallelismNum(int num) {
+ parallelismNum = num;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
new file mode 100644
index 0000000..f64193e
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Translates a {@link Read.Bounded} into a Storm spout.
+ *
+ * @param <T>
+ */
+class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
+
+ @Override
+ public void translateNode(Read.Bounded<T> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description =
+ describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+
+ TupleTag<?> outputTag = userGraphContext.getOutputTag();
+ PValue outputValue = userGraphContext.getOutput();
+ UnboundedSourceSpout spout = new UnboundedSourceSpout(
+ description,
+ new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
+ userGraphContext.getOptions(), outputTag);
+
+ context.getExecutionGraphContext().registerSpout(
+ spout, TaggedPValue.of(outputTag, outputValue));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java
new file mode 100644
index 0000000..b7154cd
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+/**
+ * Common definition of JStorm runner.
+ */
+public class CommonInstance {
+ public static final String KEY = "Key";
+ public static final String VALUE = "Value";
+
+ public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DefaultStepContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DefaultStepContext.java
new file mode 100644
index 0000000..9fd584b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DefaultStepContext.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Default StepContext for running DoFn This does not allow accessing state or timer internals.
+ */
+class DefaultStepContext implements ExecutionContext.StepContext {
+
+ private TimerInternals timerInternals;
+
+ private StateInternals stateInternals;
+
+ public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) {
+ this.timerInternals = checkNotNull(timerInternals, "timerInternals");
+ this.stateInternals = checkNotNull(stateInternals, "stateInternals");
+ }
+
+ @Override
+ public String getStepName() {
+ return null;
+ }
+
+ @Override
+ public String getTransformName() {
+ return null;
+ }
+
+ @Override
+ public void noteOutput(WindowedValue<?> windowedValue) {
+
+ }
+
+ @Override
+ public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
+
+ }
+
+ @Override
+ public <T, W extends BoundedWindow> void writePCollectionViewData(
+ TupleTag<?> tag, Iterable<WindowedValue<T>> data,
+ Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder)
+ throws IOException {
+ throw new UnsupportedOperationException("Writing side-input data is not supported.");
+ }
+
+ @Override
+ public StateInternals stateInternals() {
+ return stateInternals;
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return timerInternals;
+ }
+
+ public void setStateInternals(StateInternals stateInternals) {
+ this.stateInternals = stateInternals;
+ }
+
+ public void setTimerInternals(TimerInternals timerInternals) {
+ this.timerInternals = timerInternals;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
new file mode 100644
index 0000000..fdd9af6
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.metric.MetricClient;
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JStorm {@link Executor} for {@link DoFn}.
+ * @param <InputT> input type
+ * @param <OutputT> output type
+ */
+class DoFnExecutor<InputT, OutputT> implements Executor {
+ private static final long serialVersionUID = 5297603063991078668L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
+
+ /**
+ * Implements {@link OutputManager} in a DoFn executor.
+ */
+ public class DoFnExecutorOutputManager implements OutputManager, Serializable {
+ private static final long serialVersionUID = -661113364735206170L;
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ executorsBolt.processExecutorElem(tag, output);
+ }
+ }
+
+ protected transient DoFnRunner<InputT, OutputT> runner = null;
+ protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null;
+
+ protected final String stepName;
+
+ protected int internalDoFnExecutorId;
+
+ protected final String description;
+
+ protected final TupleTag<OutputT> mainTupleTag;
+ protected final List<TupleTag<?>> sideOutputTags;
+
+ protected SerializedPipelineOptions serializedOptions;
+ protected transient JStormPipelineOptions pipelineOptions;
+
+ protected DoFn<InputT, OutputT> doFn;
+ protected final Coder<WindowedValue<InputT>> inputCoder;
+ protected DoFnInvoker<InputT, OutputT> doFnInvoker;
+ protected OutputManager outputManager;
+ protected WindowingStrategy<?, ?> windowingStrategy;
+ protected final TupleTag<InputT> mainInputTag;
+ protected Collection<PCollectionView<?>> sideInputs;
+ protected SideInputHandler sideInputHandler;
+ protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
+
+ // Initialize during runtime
+ protected ExecutorContext executorContext;
+ protected ExecutorsBolt executorsBolt;
+ protected TimerInternals timerInternals;
+ protected transient StateInternals pushbackStateInternals;
+ protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
+ protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
+ protected transient IKvStoreManager kvStoreManager;
+ protected DefaultStepContext stepContext;
+ protected transient MetricClient metricClient;
+
+ public DoFnExecutor(
+ String stepName,
+ String description,
+ JStormPipelineOptions pipelineOptions,
+ DoFn<InputT, OutputT> doFn,
+ Coder<WindowedValue<InputT>> inputCoder,
+ WindowingStrategy<?, ?> windowingStrategy,
+ TupleTag<InputT> mainInputTag,
+ Collection<PCollectionView<?>> sideInputs,
+ Map<TupleTag, PCollectionView<?>> sideInputTagToView,
+ TupleTag<OutputT> mainTupleTag,
+ List<TupleTag<?>> sideOutputTags) {
+ this.stepName = checkNotNull(stepName, "stepName");
+ this.description = checkNotNull(description, "description");
+ this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+ this.doFn = doFn;
+ this.inputCoder = inputCoder;
+ this.outputManager = new DoFnExecutorOutputManager();
+ this.windowingStrategy = windowingStrategy;
+ this.mainInputTag = mainInputTag;
+ this.sideInputs = sideInputs;
+ this.mainTupleTag = mainTupleTag;
+ this.sideOutputTags = sideOutputTags;
+ this.sideInputTagToView = sideInputTagToView;
+ }
+
+ protected DoFnRunner<InputT, OutputT> getDoFnRunner() {
+ return new DoFnRunnerWithMetrics<>(
+ stepName,
+ DoFnRunners.simpleRunner(
+ this.pipelineOptions,
+ this.doFn,
+ this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler,
+ this.outputManager,
+ this.mainTupleTag,
+ this.sideOutputTags,
+ this.stepContext,
+ this.windowingStrategy),
+ MetricsReporter.create(metricClient));
+ }
+
+ protected void initService(ExecutorContext context) {
+ // TODO: what should be set for key in here?
+ timerInternals = new JStormTimerInternals(
+ null /* key */, this, context.getExecutorsBolt().timerService());
+ kvStoreManager = context.getKvStoreManager();
+ stepContext = new DefaultStepContext(timerInternals,
+ new JStormStateInternals(
+ null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ metricClient = new MetricClient(executorContext.getTopologyContext());
+ }
+
+ @Override
+ public void init(ExecutorContext context) {
+ this.executorContext = context;
+ this.executorsBolt = context.getExecutorsBolt();
+ this.pipelineOptions =
+ this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
+
+ initService(context);
+
+ // Side inputs setup
+ if (sideInputs != null && !sideInputs.isEmpty()) {
+ pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
+ watermarkHoldTag =
+ StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
+ pushbackStateInternals = new JStormStateInternals(
+ null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
+ sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals);
+ runner = getDoFnRunner();
+ pushbackRunner =
+ SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler);
+ } else {
+ runner = getDoFnRunner();
+ }
+
+ // Process user's setup
+ doFnInvoker = DoFnInvokers.invokerFor(doFn);
+ doFnInvoker.invokeSetup();
+ }
+
+ @Override
+ public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+ LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
+ tag, mainInputTag, sideInputs, elem.getValue()));
+ if (mainInputTag.equals(tag)) {
+ processMainInput(elem);
+ } else {
+ processSideInput(tag, elem);
+ }
+ }
+
+ protected <T> void processMainInput(WindowedValue<T> elem) {
+ if (sideInputs.isEmpty()) {
+ runner.processElement((WindowedValue<InputT>) elem);
+ } else {
+ Iterable<WindowedValue<InputT>> justPushedBack =
+ pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem);
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+ Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+ if (pushedBackValue.getTimestamp().isBefore(min)) {
+ min = pushedBackValue.getTimestamp();
+ }
+ min = earlier(min, pushedBackValue.getTimestamp());
+ pushedBack.add(pushedBackValue);
+ }
+ pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min);
+ }
+ }
+
+ protected void processSideInput(TupleTag tag, WindowedValue elem) {
+ LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
+
+ PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
+ sideInputHandler.addSideInputValue(sideInputView, elem);
+
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+ List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+
+ Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read();
+ if (pushedBackInputs != null) {
+ for (WindowedValue<InputT> input : pushedBackInputs) {
+
+ Iterable<WindowedValue<InputT>> justPushedBack =
+ pushbackRunner.processElementInReadyWindows(input);
+ Iterables.addAll(newPushedBack, justPushedBack);
+ }
+ }
+ pushedBack.clear();
+
+ Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+ min = earlier(min, pushedBackValue.getTimestamp());
+ pushedBack.add(pushedBackValue);
+ }
+
+ WatermarkHoldState watermarkHold =
+ pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
+ // TODO: clear-then-add is not thread-safe.
+ watermarkHold.clear();
+ watermarkHold.add(min);
+ }
+
+ /**
+ * Process all pushed back elements when receiving watermark with max timestamp.
+ */
+ public void processAllPushBackElements() {
+ if (sideInputs != null && !sideInputs.isEmpty()) {
+ BagState<WindowedValue<InputT>> pushedBackElements =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+ if (pushedBackElements != null) {
+ for (WindowedValue<InputT> elem : pushedBackElements.read()) {
+ LOG.info("Process pushback elem={}", elem);
+ runner.processElement(elem);
+ }
+ pushedBackElements.clear();
+ }
+
+ WatermarkHoldState watermarkHold =
+ pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
+ watermarkHold.clear();
+ watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ }
+ }
+
+ public void onTimer(Object key, TimerInternals.TimerData timerData) {
+ StateNamespace namespace = timerData.getNamespace();
+ checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+ BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
+ if (pushbackRunner != null) {
+ pushbackRunner.onTimer(
+ timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+ } else {
+ runner.onTimer(
+ timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ doFnInvoker.invokeTeardown();
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+
+ private Instant earlier(Instant left, Instant right) {
+ return left.isBefore(right) ? left : right;
+ }
+
+ public void startBundle() {
+ if (pushbackRunner != null) {
+ pushbackRunner.startBundle();
+ } else {
+ runner.startBundle();
+ }
+ }
+
+ public void finishBundle() {
+ if (pushbackRunner != null) {
+ pushbackRunner.finishBundle();
+ } else {
+ runner.finishBundle();
+ }
+ }
+
+ public void setInternalDoFnExecutorId(int id) {
+ this.internalDoFnExecutorId = id;
+ }
+
+ public int getInternalDoFnExecutorId() {
+ return internalDoFnExecutorId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnRunnerWithMetrics.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnRunnerWithMetrics.java
new file mode 100644
index 0000000..f614f1c
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnRunnerWithMetrics.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/**
+ * DoFnRunner decorator which registers {@link MetricsContainer}.
+ */
+class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+
+ private final String stepName;
+ private final DoFnRunner<InputT, OutputT> delegate;
+ private final MetricsReporter metricsReporter;
+
+ DoFnRunnerWithMetrics(
+ String stepName,
+ DoFnRunner<InputT, OutputT> delegate,
+ MetricsReporter metricsReporter) {
+ this.stepName = checkNotNull(stepName, "stepName");
+ this.delegate = checkNotNull(delegate, "delegate");
+ this.metricsReporter = checkNotNull(metricsReporter, "metricsReporter");
+ }
+
+ @Override
+ public void startBundle() {
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+ metricsReporter.getMetricsContainer(stepName))) {
+ delegate.startBundle();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> elem) {
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+ metricsReporter.getMetricsContainer(stepName))) {
+ delegate.processElement(elem);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onTimer(
+ String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+ metricsReporter.getMetricsContainer(stepName))) {
+ delegate.onTimer(timerId, window, timestamp, timeDomain);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void finishBundle() {
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+ metricsReporter.getMetricsContainer(stepName))) {
+ delegate.finishBundle();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ metricsReporter.updateMetrics();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
new file mode 100644
index 0000000..145b224
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * An executor is a basic executable unit in a JStorm task.
+ */
+interface Executor extends Serializable {
+ /**
+ * Initialization during runtime.
+ */
+ void init(ExecutorContext context);
+
+ <T> void process(TupleTag<T> tag, WindowedValue<T> elem);
+
+ void cleanup();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorContext.java
new file mode 100644
index 0000000..487db35
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import backtype.storm.task.TopologyContext;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.google.auto.value.AutoValue;
+
+/**
+ * Context of a executors bolt when runtime.
+ */
+@AutoValue
+abstract class ExecutorContext {
+ public static ExecutorContext of(
+ TopologyContext topologyContext,
+ ExecutorsBolt bolt,
+ IKvStoreManager kvStoreManager) {
+ return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager);
+ }
+
+ public abstract TopologyContext getTopologyContext();
+
+ public abstract ExecutorsBolt getExecutorsBolt();
+
+ public abstract IKvStoreManager getKvStoreManager();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
new file mode 100644
index 0000000..ef12db8
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBatchBolt;
+import backtype.storm.tuple.ITupleExt;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.KvStoreManagerFactory;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.utils.KryoSerializer;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ExecutorsBolt is a JStorm Bolt composited with several executors chained in a sub-DAG.
+ */
+public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
+ private static final long serialVersionUID = -7751043327801735211L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
+
+ protected ExecutorContext executorContext;
+
+ protected TimerService timerService;
+
+ // map from input tag to executor inside bolt
+ protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
+ // set of all output tags that will be emit outside bolt
+ protected final Set<TupleTag> outputTags = Sets.newHashSet();
+ protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
+ protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet();
+ protected int internalDoFnExecutorId = 1;
+ protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
+
+ protected OutputCollector collector;
+
+ protected boolean isStatefulBolt = false;
+
+ protected KryoSerializer<WindowedValue> serializer;
+
+ public ExecutorsBolt() {
+
+ }
+
+ public void setStatefulBolt(boolean isStateful) {
+ isStatefulBolt = isStateful;
+ }
+
+ public void addExecutor(TupleTag inputTag, Executor executor) {
+ inputTagToExecutor.put(
+ checkNotNull(inputTag, "inputTag"),
+ checkNotNull(executor, "executor"));
+ }
+
+ public Map<TupleTag, Executor> getExecutors() {
+ return inputTagToExecutor;
+ }
+
+ public void registerExecutor(Executor executor) {
+ if (executor instanceof DoFnExecutor) {
+ DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
+ idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor);
+ doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId);
+ internalDoFnExecutorId++;
+ }
+ }
+
+ public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() {
+ return idToDoFnExecutor;
+ }
+
+ public void addOutputTags(TupleTag tag) {
+ outputTags.add(tag);
+ }
+
+ public void addExternalOutputTag(TupleTag<?> tag) {
+ externalOutputTags.add(tag);
+ }
+
+ public Set<TupleTag> getOutputTags() {
+ return outputTags;
+ }
+
+ public ExecutorContext getExecutorContext() {
+ return executorContext;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ LOG.info("Start to prepare for task-{}", context.getThisTaskId());
+ try {
+ this.collector = collector;
+
+ // init kv store manager
+ String storeName = String.format("task-%d", context.getThisTaskId());
+ String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
+ IKvStoreManager kvStoreManager = isStatefulBolt
+ ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(
+ context, storeName, stateStorePath, isStatefulBolt)
+ : KvStoreManagerFactory.getKvStoreManager(
+ stormConf, storeName, stateStorePath, isStatefulBolt);
+ this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
+
+ // init time service
+ timerService = initTimerService();
+
+ // init all internal executors
+ for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
+ executor.init(executorContext);
+ if (executor instanceof DoFnExecutor) {
+ doFnExecutors.add((DoFnExecutor) executor);
+ }
+ }
+
+ this.serializer = new KryoSerializer<WindowedValue>(stormConf);
+
+ LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values());
+ LOG.info("inputTagToExecutor={}", inputTagToExecutor);
+ LOG.info("outputTags={}", outputTags);
+ LOG.info("externalOutputTags={}", externalOutputTags);
+ LOG.info("doFnExecutors={}", doFnExecutors);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to prepare executors bolt", e);
+ }
+ }
+
+ public TimerService initTimerService() {
+ TopologyContext context = executorContext.getTopologyContext();
+ List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet())
+ .transformAndConcat(
+ new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() {
+ @Override
+ public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) {
+ if (Common.isSystemComponent(value.getKey())) {
+ return Collections.EMPTY_LIST;
+ } else {
+ return value.getValue();
+ }
+ }
+ })
+ .toList();
+ TimerService ret = new TimerServiceImpl(executorContext);
+ ret.init(tasks);
+ return ret;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ // process a batch
+ String streamId = input.getSourceStreamId();
+ ITupleExt tuple = (ITupleExt) input;
+ Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
+ if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
+ while (valueIterator.hasNext()) {
+ processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
+ }
+ } else {
+ doFnStartBundle();
+ while (valueIterator.hasNext()) {
+ processElement(valueIterator.next(), streamId);
+ }
+ doFnFinishBundle();
+ }
+ }
+
+ private void processWatermark(long watermarkTs, int sourceTask) {
+ long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs);
+ LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}",
+ (new Instant(watermarkTs)).toDateTime(),
+ sourceTask,
+ (new Instant(newWaterMark)).toDateTime());
+ if (newWaterMark != 0) {
+ // Some buffer windows are going to be triggered.
+ doFnStartBundle();
+ timerService.fireTimers(newWaterMark);
+
+ // SideInput: If receiving water mark with max timestamp, It means no more data is supposed
+ // to be received from now on. So we are going to process all push back data.
+ if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ for (DoFnExecutor doFnExecutor : doFnExecutors) {
+ doFnExecutor.processAllPushBackElements();
+ }
+ }
+
+ doFnFinishBundle();
+ }
+
+ long currentWaterMark = timerService.currentOutputWatermark();
+ if (!externalOutputTags.isEmpty()) {
+ collector.flush();
+ collector.emit(
+ CommonInstance.BEAM_WATERMARK_STREAM_ID,
+ new Values(currentWaterMark));
+ LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime());
+ }
+ }
+
+ private void processElement(List<Object> values, String streamId) {
+ TupleTag inputTag = new TupleTag(streamId);
+ WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values);
+ processExecutorElem(inputTag, windowedValue);
+ }
+
+ public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
+ LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
+ if (elem != null) {
+ Executor executor = inputTagToExecutor.get(inputTag);
+ if (executor != null) {
+ executor.process(inputTag, elem);
+ }
+ if (externalOutputTags.contains(inputTag)) {
+ emitOutsideBolt(inputTag, elem);
+ }
+ } else {
+ LOG.info("Received null elem for tag={}", inputTag);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
+ executor.cleanup();
+ }
+ executorContext.getKvStoreManager().close();
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ public TimerService timerService() {
+ return timerService;
+ }
+
+ public void setTimerService(TimerService service) {
+ timerService = service;
+ }
+
+ private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) {
+ WindowedValue wv = null;
+ if (values.size() > 1) {
+ Object key = values.get(0);
+ WindowedValue value = serializer.deserialize((byte[]) values.get(1));
+ wv = value.withValue(KV.of(key, value.getValue()));
+ } else {
+ wv = serializer.deserialize((byte[]) values.get(0));
+ }
+ return wv;
+ }
+
+ protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) {
+ LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue());
+ if (keyedEmit(outputTag.getId())) {
+ KV kv = (KV) outputValue.getValue();
+ byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue()));
+ // Convert WindowedValue<KV> to <K, WindowedValue<V>>
+ if (kv.getKey() == null) {
+ // If key is null, emit "null" string here. Because, null value will be ignored in JStorm.
+ collector.emit(outputTag.getId(), new Values("null", immutableOutputValue));
+ } else {
+ collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue));
+ }
+ } else {
+ byte[] immutableOutputValue = serializer.serialize(outputValue);
+ collector.emit(outputTag.getId(), new Values(immutableOutputValue));
+ }
+ }
+
+ private void doFnStartBundle() {
+ for (DoFnExecutor doFnExecutor : doFnExecutors) {
+ doFnExecutor.startBundle();
+ }
+ }
+
+ private void doFnFinishBundle() {
+ for (DoFnExecutor doFnExecutor : doFnExecutors) {
+ doFnExecutor.finishBundle();
+ }
+ }
+
+ @Override
+ public String toString() {
+ // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString());
+ List<String> ret = new ArrayList<>();
+ /*ret.add("inputTags");
+ for (TupleTag inputTag : inputTagToExecutor.keySet()) {
+ ret.add(inputTag.getId());
+ }*/
+ ret.add("internalExecutors");
+ for (Executor executor : inputTagToExecutor.values()) {
+ ret.add(executor.toString());
+ }
+ ret.add("externalOutputTags");
+ for (TupleTag output : externalOutputTags) {
+ ret.add(output.getId());
+ }
+ return Joiner.on('\n').join(ret).concat("\n");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
new file mode 100644
index 0000000..a64f494
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.Flatten}.
+ * @param <InputT>
+ */
+class FlattenExecutor<InputT> implements Executor {
+
+ private final String description;
+ private TupleTag mainOutputTag;
+ private ExecutorContext context;
+ private ExecutorsBolt executorsBolt;
+
+ public FlattenExecutor(String description, TupleTag mainTupleTag) {
+ this.description = checkNotNull(description, "description");
+ this.mainOutputTag = mainTupleTag;
+ }
+
+ @Override
+ public void init(ExecutorContext context) {
+ this.context = context;
+ this.executorsBolt = context.getExecutorsBolt();
+ }
+
+ @Override
+ public void process(TupleTag tag, WindowedValue elem) {
+ executorsBolt.processExecutorElem(mainOutputTag, elem);
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
new file mode 100644
index 0000000..89708df
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}.
+ * @param <V>
+ */
+class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> {
+
+ @Override
+ public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+ // Since a new tag is created in PCollectionList, retrieve the real tag here.
+ Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
+ for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
+ PCollection<V> pc = (PCollection<V>) entry.getValue();
+ inputs.putAll(pc.expand());
+ }
+ System.out.println("Real inputs: " + inputs);
+ System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
+ String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
+ FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
+ context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
new file mode 100644
index 0000000..85c958a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Translates a {@link GroupByKey} to a JStorm {@link GroupByWindowExecutor}.
+ * @param <K>
+ * @param <V>
+ */
+class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
+ // information of transform
+ protected PCollection<KV<K, V>> input;
+ protected PCollection<KV<K, Iterable<V>>> output;
+ protected List<TupleTag<?>> inputTags;
+ protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
+ protected List<TupleTag<?>> sideOutputTags;
+ protected List<PCollectionView<?>> sideInputs;
+ protected WindowingStrategy<?, ?> windowingStrategy;
+
+ @Override
+ public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description =
+ describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+
+ input = (PCollection<KV<K, V>>) userGraphContext.getInput();
+ output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
+
+ inputTags = userGraphContext.getInputTags();
+ mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
+ sideOutputTags = Lists.newArrayList();
+
+ sideInputs = Collections.<PCollectionView<?>>emptyList();
+ windowingStrategy = input.getWindowingStrategy();
+
+ GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ context,
+ context.getUserGraphContext().getOptions(),
+ windowingStrategy,
+ mainOutputTag,
+ sideOutputTags);
+ context.addTransformExecutor(groupByWindowExecutor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
new file mode 100644
index 0000000..bf6e1ad
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.GroupByKey}.
+ * @param <K>
+ * @param <V>
+ */
+class GroupByWindowExecutor<K, V>
+ extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
+ private static final long serialVersionUID = -7563050475488610553L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
+
+ private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ executorsBolt.processExecutorElem(tag, output);
+ }
+ }
+
+ private KvCoder<K, V> inputKvCoder;
+ private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
+
+ public GroupByWindowExecutor(
+ String stepName,
+ String description,
+ TranslationContext context,
+ JStormPipelineOptions pipelineOptions,
+ WindowingStrategy<?, ?> windowingStrategy,
+ TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
+ // The doFn will be created when runtime. Just pass "null" here
+ super(
+ stepName,
+ description,
+ pipelineOptions,
+ null,
+ null,
+ windowingStrategy,
+ null,
+ null,
+ null,
+ mainTupleTag,
+ sideOutputTags);
+
+ this.outputManager = new GroupByWindowOutputManager();
+ UserGraphContext userGraphContext = context.getUserGraphContext();
+ PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
+ this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
+ }
+
+ private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() {
+ final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() {
+ @Override
+ public StateInternals stateInternalsForKey(K key) {
+ return new JStormStateInternals<K>(
+ key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
+ }
+ };
+ TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() {
+ @Override
+ public TimerInternals timerInternalsForKey(K key) {
+ return new JStormTimerInternals<>(
+ key,
+ GroupByWindowExecutor.this,
+ executorContext.getExecutorsBolt().timerService());
+ }
+ };
+
+ reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder());
+ DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn =
+ GroupAlsoByWindowViaWindowSetNewDoFn.create(
+ windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(),
+ (SystemReduceFn) reduceFn, outputManager, mainTupleTag);
+ return doFn;
+ }
+
+ @Override
+ protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() {
+ doFn = getGroupByWindowDoFn();
+
+ DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.simpleRunner(
+ this.pipelineOptions,
+ this.doFn,
+ NullSideInputReader.empty(),
+ this.outputManager,
+ this.mainTupleTag,
+ this.sideOutputTags,
+ this.stepContext,
+ this.windowingStrategy);
+
+ DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner =
+ DoFnRunners.lateDataDroppingRunner(
+ simpleRunner,
+ this.stepContext,
+ this.windowingStrategy);
+ return new DoFnRunnerWithMetrics<>(
+ stepName, doFnRunner, MetricsReporter.create(metricClient));
+ }
+
+ @Override
+ public void process(TupleTag tag, WindowedValue elem) {
+ /**
+ * For GroupByKey, KV type elem is received. We need to convert the KV elem
+ * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner.
+ */
+ KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem);
+ runner.processElement(elem.withValue(keyedWorkItem));
+ }
+
+ @Override
+ public void onTimer(Object key, TimerInternals.TimerData timerData) {
+ StateNamespace namespace = timerData.getNamespace();
+ checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+
+ runner.processElement(
+ WindowedValue.valueInGlobalWindow(
+ KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData))));
+ }
+
+ @Override
+ public String toString() {
+ return super.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java
new file mode 100644
index 0000000..3e5d52b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStore;
+import com.alibaba.jstorm.cache.KvStoreIterable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link BagState} in JStorm runner.
+ */
+class JStormBagState<K, T> implements BagState<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
+
+ @Nullable
+ private final K key;
+ private final StateNamespace namespace;
+ private final IKvStore<ComposedKey, T> kvState;
+ private final IKvStore<ComposedKey, Object> stateInfoKvState;
+ private int elemIndex;
+
+ public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
+ IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
+ this.key = key;
+ this.namespace = checkNotNull(namespace, "namespace");
+ this.kvState = checkNotNull(kvState, "kvState");
+ this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
+
+ Integer index = (Integer) stateInfoKvState.get(getComposedKey());
+ this.elemIndex = index != null ? ++index : 0;
+ }
+
+ @Override
+ public void add(T input) {
+ try {
+ kvState.put(getComposedKey(elemIndex), input);
+ stateInfoKvState.put(getComposedKey(), elemIndex);
+ elemIndex++;
+ } catch (IOException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public Boolean read() {
+ return elemIndex <= 0;
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public Iterable<T> read() {
+ return new BagStateIterable(elemIndex);
+ }
+
+ @Override
+ public BagState readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ try {
+ for (int i = 0; i < elemIndex; i++) {
+ kvState.remove(getComposedKey(i));
+ }
+ stateInfoKvState.remove(getComposedKey());
+ elemIndex = 0;
+ } catch (IOException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ private ComposedKey getComposedKey() {
+ return ComposedKey.of(key, namespace);
+ }
+
+ private ComposedKey getComposedKey(int elemIndex) {
+ return ComposedKey.of(key, namespace, elemIndex);
+ }
+
+ /**
+ * Implementation of Bag state Iterable.
+ */
+ private class BagStateIterable implements KvStoreIterable<T> {
+
+ private class BagStateIterator implements Iterator<T> {
+ private final int size;
+ private int cursor = 0;
+
+ BagStateIterator() {
+ Integer s = null;
+ try {
+ s = (Integer) stateInfoKvState.get(getComposedKey());
+ } catch (IOException e) {
+ LOG.error("Failed to get elemIndex for key={}", getComposedKey());
+ }
+ this.size = s != null ? ++s : 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return cursor < size;
+ }
+
+ @Override
+ public T next() {
+ if (cursor >= size) {
+ throw new NoSuchElementException();
+ }
+
+ T value = null;
+ try {
+ value = kvState.get(getComposedKey(cursor));
+ } catch (IOException e) {
+ LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
+ }
+ cursor++;
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private final int size;
+
+ BagStateIterable(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new BagStateIterator();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("BagStateIterable: composedKey=%s", getComposedKey());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java
new file mode 100644
index 0000000..6bd021f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * JStorm implementation of {@link CombiningState}.
+ */
+class JStormCombiningState<InputT, AccumT, OutputT>
+ implements CombiningState<InputT, AccumT, OutputT> {
+
+ @Nullable
+ private final BagState<AccumT> accumBagState;
+ private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+
+ JStormCombiningState(
+ BagState<AccumT> accumBagState,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ this.accumBagState = checkNotNull(accumBagState, "accumBagState");
+ this.combineFn = checkNotNull(combineFn, "combineFn");
+ }
+
+ @Override
+ public AccumT getAccum() {
+ // TODO: replacing the accumBagState with the merged accum.
+ return combineFn.mergeAccumulators(accumBagState.read());
+ }
+
+ @Override
+ public void addAccum(AccumT accumT) {
+ accumBagState.add(accumT);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
+ return combineFn.mergeAccumulators(iterable);
+ }
+
+ @Override
+ public void add(InputT input) {
+ accumBagState.add(
+ combineFn.addInput(combineFn.createAccumulator(), input));
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return accumBagState.isEmpty();
+ }
+
+ @Override
+ public OutputT read() {
+ return combineFn.extractOutput(
+ combineFn.mergeAccumulators(accumBagState.read()));
+ }
+
+ @Override
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ accumBagState.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java
new file mode 100644
index 0000000..6a4e376
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.alibaba.jstorm.cache.IKvStore;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link MapState} in JStorm runner.
+ * @param <K>
+ * @param <V>
+ */
+class JStormMapState<K, V> implements MapState<K, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
+
+ private final K key;
+ private final StateNamespace namespace;
+ private IKvStore<K, V> kvStore;
+
+ public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
+ this.key = key;
+ this.namespace = namespace;
+ this.kvStore = kvStore;
+ }
+
+ @Override
+ public void put(K var1, V var2) {
+ try {
+ kvStore.put(var1, var2);
+ } catch (IOException e) {
+ reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
+ }
+ }
+
+ @Override
+ public ReadableState<V> putIfAbsent(K var1, V var2) {
+ ReadableState<V> ret = null;
+ try {
+ V value = kvStore.get(var1);
+ if (value == null) {
+ kvStore.put(var1, var2);
+ ret = new MapReadableState<>(null);
+ } else {
+ ret = new MapReadableState<>(value);
+ }
+ } catch (IOException e) {
+ reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void remove(K var1) {
+ try {
+ kvStore.remove(var1);
+ } catch (IOException e) {
+ reportError(String.format("Failed to remove key=%s", var1), e);
+ }
+ }
+
+ @Override
+ public ReadableState<V> get(K var1) {
+ ReadableState<V> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState(kvStore.get(var1));
+ } catch (IOException e) {
+ reportError(String.format("Failed to get value for key=%s", var1), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public ReadableState<Iterable<K>> keys() {
+ ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState<>(kvStore.keys());
+ } catch (IOException e) {
+ reportError(String.format("Failed to get keys"), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public ReadableState<Iterable<V>> values() {
+ ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState<>(kvStore.values());
+ } catch (IOException e) {
+ reportError(String.format("Failed to get values"), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
+ ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState<>(kvStore.entries());
+ } catch (IOException e) {
+ reportError(String.format("Failed to get values"), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void clear() {
+ try {
+ Iterable<K> keys = kvStore.keys();
+ kvStore.removeBatch(keys);
+ } catch (IOException e) {
+ reportError(String.format("Failed to clear map state"), e);
+ }
+ }
+
+ private void reportError(String errorInfo, IOException e) {
+ LOG.error(errorInfo, e);
+ throw new RuntimeException(errorInfo);
+ }
+
+ private class MapReadableState<T> implements ReadableState<T> {
+ private T value;
+
+ public MapReadableState(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public T read() {
+ return value;
+ }
+
+ @Override
+ public ReadableState<T> readLater() {
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
index 1449a43..298ad32 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
@@ -22,8 +22,6 @@ import com.google.common.collect.Iterables;
import java.util.List;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
-import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;