You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:04 UTC
[04/53] [abbrv] beam git commit: jstorm-runner: rename the package to
org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
new file mode 100644
index 0000000..1de881f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import backtype.storm.task.TopologyContext;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+public abstract class ExecutorContext {
+ public static ExecutorContext of(TopologyContext topologyContext, ExecutorsBolt bolt, IKvStoreManager kvStoreManager) {
+ return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager);
+ }
+
+ public abstract TopologyContext getTopologyContext();
+
+ public abstract ExecutorsBolt getExecutorsBolt();
+
+ public abstract IKvStoreManager getKvStoreManager();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
new file mode 100644
index 0000000..9df1e17
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import java.io.IOException;
+import java.util.*;
+
+import avro.shaded.com.google.common.base.Joiner;
+import avro.shaded.com.google.common.collect.Sets;
+import backtype.storm.tuple.ITupleExt;
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.KvStoreManagerFactory;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.utils.KryoSerializer;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Maps;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ExecutorsBolt extends AdaptorBasicBolt {
+ private static final long serialVersionUID = -7751043327801735211L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
+
+ protected ExecutorContext executorContext;
+
+ protected TimerService timerService;
+
+ // map from input tag to executor inside bolt
+ protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
+ // set of all output tags that will be emit outside bolt
+ protected final Set<TupleTag> outputTags = Sets.newHashSet();
+ protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
+ protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet();
+ protected int internalDoFnExecutorId = 1;
+ protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
+
+ protected OutputCollector collector;
+
+ protected boolean isStatefulBolt = false;
+
+ protected KryoSerializer<WindowedValue> serializer;
+
+ public ExecutorsBolt() {
+
+ }
+
+ public void setStatefulBolt(boolean isStateful) {
+ isStatefulBolt = isStateful;
+ }
+
+ public void addExecutor(TupleTag inputTag, Executor executor) {
+ inputTagToExecutor.put(
+ checkNotNull(inputTag, "inputTag"),
+ checkNotNull(executor, "executor"));
+ }
+
+ public Map<TupleTag, Executor> getExecutors() {
+ return inputTagToExecutor;
+ }
+
+ public void registerExecutor(Executor executor) {
+ if (executor instanceof DoFnExecutor) {
+ DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
+ idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor);
+ doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId);
+ internalDoFnExecutorId++;
+ }
+ }
+
+ public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() {
+ return idToDoFnExecutor;
+ }
+
+ public void addOutputTags(TupleTag tag) {
+ outputTags.add(tag);
+ }
+
+ public void addExternalOutputTag(TupleTag<?> tag) {
+ externalOutputTags.add(tag);
+ }
+
+ public Set<TupleTag> getOutputTags() {
+ return outputTags;
+ }
+
+ public ExecutorContext getExecutorContext() {
+ return executorContext;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ LOG.info("Start to prepare for task-{}", context.getThisTaskId());
+ try {
+ this.collector = collector;
+
+ // init kv store manager
+ String storeName = String.format("task-%d", context.getThisTaskId());
+ String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
+ IKvStoreManager kvStoreManager = isStatefulBolt ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, stateStorePath, isStatefulBolt) :
+ KvStoreManagerFactory.getKvStoreManager(stormConf, storeName, stateStorePath, isStatefulBolt);
+ this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
+
+ // init time service
+ timerService = initTimerService();
+
+ // init all internal executors
+ for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
+ executor.init(executorContext);
+ if (executor instanceof DoFnExecutor) {
+ doFnExecutors.add((DoFnExecutor) executor);
+ }
+ }
+
+ this.serializer = new KryoSerializer<WindowedValue>(stormConf);
+
+ LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values());
+ LOG.info("inputTagToExecutor={}", inputTagToExecutor);
+ LOG.info("outputTags={}", outputTags);
+ LOG.info("externalOutputTags={}", externalOutputTags);
+ LOG.info("doFnExecutors={}", doFnExecutors);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to prepare executors bolt", e);
+ }
+ }
+
+ public TimerService initTimerService() {
+ TopologyContext context = executorContext.getTopologyContext();
+ List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet())
+ .transformAndConcat(
+ new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() {
+ @Override
+ public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) {
+ if (Common.isSystemComponent(value.getKey())) {
+ return Collections.EMPTY_LIST;
+ } else {
+ return value.getValue();
+ }
+ }
+ })
+ .toList();
+ TimerService ret = new TimerServiceImpl(executorContext);
+ ret.init(tasks);
+ return ret;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ // process a batch
+ String streamId = input.getSourceStreamId();
+ ITupleExt tuple = (ITupleExt) input;
+ Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
+ if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
+ while (valueIterator.hasNext()) {
+ processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
+ }
+ } else {
+ doFnStartBundle();
+ while (valueIterator.hasNext()) {
+ processElement(valueIterator.next(), streamId);
+ }
+ doFnFinishBundle();
+ }
+ }
+
+ private void processWatermark(long watermarkTs, int sourceTask) {
+ long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs);
+ LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}",
+ (new Instant(watermarkTs)).toDateTime(), sourceTask, (new Instant(newWaterMark)).toDateTime());
+ if (newWaterMark != 0) {
+ // Some buffer windows are going to be triggered.
+ doFnStartBundle();
+ timerService.fireTimers(newWaterMark);
+
+ // SideInput: If receiving water mark with max timestamp, It means no more data is supposed
+ // to be received from now on. So we are going to process all push back data.
+ if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ for (DoFnExecutor doFnExecutor : doFnExecutors) {
+ doFnExecutor.processAllPushBackElements();
+ }
+ }
+
+ doFnFinishBundle();
+ }
+
+ long currentWaterMark = timerService.currentOutputWatermark();
+ if (!externalOutputTags.isEmpty()) {
+ collector.flush();
+ collector.emit(
+ CommonInstance.BEAM_WATERMARK_STREAM_ID,
+ new Values(currentWaterMark));
+ LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime());
+ }
+ }
+
+ private void processElement(List<Object> values, String streamId) {
+ TupleTag inputTag = new TupleTag(streamId);
+ WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values);
+ processExecutorElem(inputTag, windowedValue);
+ }
+
+ public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
+ LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
+ if (elem != null) {
+ Executor executor = inputTagToExecutor.get(inputTag);
+ if (executor != null) {
+ executor.process(inputTag, elem);
+ }
+ if (externalOutputTags.contains(inputTag)) {
+ emitOutsideBolt(inputTag, elem);
+ }
+ } else {
+ LOG.info("Received null elem for tag={}", inputTag);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
+ executor.cleanup();
+ }
+ executorContext.getKvStoreManager().close();
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ public TimerService timerService() {
+ return timerService;
+ }
+
+ public void setTimerService(TimerService service) {
+ timerService = service;
+ }
+
+ private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) {
+ WindowedValue wv = null;
+ if (values.size() > 1) {
+ Object key = values.get(0);
+ WindowedValue value = serializer.deserialize((byte[]) values.get(1));
+ wv = value.withValue(KV.of(key, value.getValue()));
+ } else {
+ wv = serializer.deserialize((byte[])values.get(0));
+ }
+ return wv;
+ }
+
+ protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) {
+ LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue());
+ if (keyedEmit(outputTag.getId())) {
+ KV kv = (KV) outputValue.getValue();
+ byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue()));
+ // Convert WindowedValue<KV> to <K, WindowedValue<V>>
+ if (kv.getKey() == null) {
+ // If key is null, emit "null" string here. Because, null value will be ignored in JStorm.
+ collector.emit(outputTag.getId(), new Values("null", immutableOutputValue));
+ } else {
+ collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue));
+ }
+ } else {
+ byte[] immutableOutputValue = serializer.serialize(outputValue);
+ collector.emit(outputTag.getId(), new Values(immutableOutputValue));
+ }
+ }
+
+ private void doFnStartBundle() {
+ for (DoFnExecutor doFnExecutor : doFnExecutors) {
+ doFnExecutor.startBundle();
+ }
+ }
+
+ private void doFnFinishBundle() {
+ for (DoFnExecutor doFnExecutor : doFnExecutors) {
+ doFnExecutor.finishBundle();
+ }
+ }
+
+ @Override
+ public String toString() {
+ // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString());
+ List<String> ret = new ArrayList<>();
+ /*ret.add("inputTags");
+ for (TupleTag inputTag : inputTagToExecutor.keySet()) {
+ ret.add(inputTag.getId());
+ }*/
+ ret.add("internalExecutors");
+ for (Executor executor : inputTagToExecutor.values()) {
+ ret.add(executor.toString());
+ }
+ ret.add("externalOutputTags");
+ for (TupleTag output : externalOutputTags) {
+ ret.add(output.getId());
+ }
+ return Joiner.on('\n').join(ret).concat("\n");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
new file mode 100644
index 0000000..1ef28c9
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class FlattenExecutor<InputT> implements Executor {
+
+ private final String description;
+ private TupleTag mainOutputTag;
+ private ExecutorContext context;
+ private ExecutorsBolt executorsBolt;
+
+ public FlattenExecutor(String description, TupleTag mainTupleTag) {
+ this.description = checkNotNull(description, "description");
+ this.mainOutputTag = mainTupleTag;
+ }
+
+ @Override
+ public void init(ExecutorContext context) {
+ this.context = context;
+ this.executorsBolt = context.getExecutorsBolt();
+ }
+
+ @Override
+ public void process(TupleTag tag, WindowedValue elem) {
+ executorsBolt.processExecutorElem(mainOutputTag, elem);
+ }
+
+ @Override
+ public void cleanup() {}
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
new file mode 100644
index 0000000..419a4a0
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
+import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext;
+import org.apache.beam.runners.jstorm.util.RunnerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class GroupByWindowExecutor<K, V> extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
+ private static final long serialVersionUID = -7563050475488610553L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
+
+ private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ executorsBolt.processExecutorElem(tag, output);
+ }
+ }
+
+ private KvCoder<K, V> inputKvCoder;
+ private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
+
+ public GroupByWindowExecutor(
+ String stepName,
+ String description,
+ TranslationContext context,
+ StormPipelineOptions pipelineOptions,
+ WindowingStrategy<?, ?> windowingStrategy,
+ TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
+ // The doFn will be created when runtime. Just pass "null" here
+ super(stepName, description, pipelineOptions, null, null, windowingStrategy, null, null, null, mainTupleTag, sideOutputTags);
+
+ this.outputManager = new GroupByWindowOutputManager();
+ UserGraphContext userGraphContext = context.getUserGraphContext();
+ PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
+ this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
+ }
+
+ private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() {
+ final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() {
+ @Override
+ public StateInternals stateInternalsForKey(K key) {
+ return new JStormStateInternals<K>(key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
+ }
+ };
+ TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() {
+ @Override
+ public TimerInternals timerInternalsForKey(K key) {
+ return new JStormTimerInternals<>(key, GroupByWindowExecutor.this, executorContext.getExecutorsBolt().timerService());
+ }
+ };
+
+ reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder());
+ DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn =
+ GroupAlsoByWindowViaWindowSetNewDoFn.create(
+ windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(),
+ (SystemReduceFn) reduceFn, outputManager, mainTupleTag);
+ return doFn;
+ }
+
+ @Override
+ protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() {
+ doFn = getGroupByWindowDoFn();
+
+ DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>simpleRunner(
+ this.pipelineOptions,
+ this.doFn,
+ NullSideInputReader.empty(),
+ this.outputManager,
+ this.mainTupleTag,
+ this.sideOutputTags,
+ this.stepContext,
+ this.windowingStrategy);
+
+ DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner = DoFnRunners.lateDataDroppingRunner(
+ simpleRunner,
+ this.stepContext,
+ this.windowingStrategy);
+ return new DoFnRunnerWithMetrics<>(
+ stepName, doFnRunner, MetricsReporter.create(metricClient));
+ }
+
+ @Override
+ public void process(TupleTag tag, WindowedValue elem) {
+ /**
+ * For GroupByKey, KV type elem is received. We need to convert the KV elem
+ * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner.
+ */
+ KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem);
+ runner.processElement(elem.withValue(keyedWorkItem));
+ }
+
+ @Override
+ public void onTimer(Object key, TimerInternals.TimerData timerData) {
+ StateNamespace namespace = timerData.getNamespace();
+ checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+
+ runner.processElement(
+ WindowedValue.valueInGlobalWindow(
+ KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData))));
+ }
+
+ @Override
+ public String toString() {
+ return super.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java
new file mode 100644
index 0000000..a022440
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
+import com.alibaba.jstorm.common.metric.AsmCounter;
+import com.alibaba.jstorm.metric.MetricClient;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+
+/**
+ * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to JStorm engine.
+ */
+public class MetricsReporter {
+
+ private static final String METRIC_KEY_SEPARATOR = "__";
+ private static final String COUNTER_PREFIX = "__counter";
+
+ private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
+ private final Map<String, Long> reportedCounters = Maps.newHashMap();
+ private final MetricClient metricClient;
+
+ public static MetricsReporter create(MetricClient metricClient) {
+ return new MetricsReporter(metricClient);
+ }
+
+ private MetricsReporter(MetricClient metricClient) {
+ this.metricClient = checkNotNull(metricClient, "metricClient");
+ }
+
+ public MetricsContainer getMetricsContainer(String stepName) {
+ return metricsContainers.getContainer(stepName);
+ }
+
+ public void updateMetrics() {
+ MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
+ MetricQueryResults metricQueryResults =
+ metricResults.queryMetrics(MetricsFilter.builder().build());
+ updateCounters(metricQueryResults.counters());
+ }
+
+ private void updateCounters(Iterable<MetricResult<Long>> counters) {
+ System.out.print("updateCounters");
+ for (MetricResult<Long> metricResult : counters) {
+ String metricName = getMetricNameString(COUNTER_PREFIX, metricResult);
+ System.out.print("metricName: " + metricName);
+ Long updateValue = metricResult.attempted();
+ Long oldValue = reportedCounters.get(metricName);
+
+ if (oldValue == null || oldValue < updateValue) {
+ AsmCounter counter = metricClient.registerCounter(metricName);
+ Long incValue = (oldValue == null ? updateValue : updateValue - oldValue);
+ counter.update(incValue);
+ }
+ }
+ }
+
+ private String getMetricNameString(String prefix, MetricResult<?> metricResult) {
+ return prefix
+ + METRIC_KEY_SEPARATOR + metricResult.step()
+ + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
+ + METRIC_KEY_SEPARATOR + metricResult.name().name();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
new file mode 100644
index 0000000..28dc234
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
+ private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
+
+ /**
+ * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated tag
+ * is used in downstream consumer. So before output, we need to map this "local" tag to "external"
+ * tag. See PCollectionTuple for details.
+ */
+ public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ if (localTupleTagMap.containsKey(tag)) {
+ executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
+ } else {
+ executorsBolt.processExecutorElem(tag, output);
+ }
+ }
+ }
+
+ protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
+
+ public MultiOutputDoFnExecutor(
+ String stepName,
+ String description,
+ StormPipelineOptions pipelineOptions,
+ DoFn<InputT, OutputT> doFn,
+ Coder<WindowedValue<InputT>> inputCoder,
+ WindowingStrategy<?, ?> windowingStrategy,
+ TupleTag<InputT> mainInputTag,
+ Collection<PCollectionView<?>> sideInputs,
+ Map<TupleTag, PCollectionView<?>> sideInputTagToView,
+ TupleTag<OutputT> mainTupleTag,
+ List<TupleTag<?>> sideOutputTags,
+ Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
+ ) {
+ super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
+ sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
+ this.localTupleTagMap = localTupleTagMap;
+ this.outputManager = new MultiOutputDoFnExecutorOutputManager();
+ LOG.info("localTupleTagMap: {}", localTupleTagMap);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
new file mode 100644
index 0000000..a58a818
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
+import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
+
+ public MultiStatefulDoFnExecutor(
+ String stepName, String description,
+ StormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn,
+ Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
+ TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
+ Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
+ List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
+ super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
+ }
+
+ @Override
+ public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+ if (mainInputTag.equals(tag)) {
+ WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
+ stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
+ executorContext.getExecutorsBolt().timerService()));
+ stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
+ kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ processMainInput(elem);
+ } else {
+ processSideInput(tag, elem);
+ }
+ }
+
+ @Override
+ public void onTimer(Object key, TimerInternals.TimerData timerData) {
+ stepContext.setStateInternals(new JStormStateInternals<>(key,
+ kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ super.onTimer(key, timerData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
new file mode 100644
index 0000000..269f03c
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
+import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
+ public StatefulDoFnExecutor(
+ String stepName, String description, StormPipelineOptions pipelineOptions,
+ DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder,
+ WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag,
+ Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>>
+ sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
+ super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy,
+ mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
+ }
+
+ @Override
+ public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+ if (mainInputTag.equals(tag)) {
+ WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
+ stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
+ executorContext.getExecutorsBolt().timerService()));
+ stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
+ kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ processMainInput(elem);
+ } else {
+ processSideInput(tag, elem);
+ }
+ }
+
+ @Override
+ public void onTimer(Object key, TimerInternals.TimerData timerData) {
+ stepContext.setStateInternals(new JStormStateInternals<>(key,
+ kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ super.onTimer(key, timerData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
new file mode 100644
index 0000000..47db018
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.runners.core.TimerInternals;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface that tracks input watermarks and manages timers in each bolt.
+ */
+public interface TimerService extends Serializable {
+
+ void init(List<Integer> upStreamTasks);
+
+ /**
+ *
+ * @param task
+ * @param inputWatermark
+ * @return new watermark if any timer is triggered during the update of watermark, otherwise 0
+ */
+ long updateInputWatermark(Integer task, long inputWatermark);
+
+ long currentInputWatermark();
+
+ long currentOutputWatermark();
+
+ void clearWatermarkHold(String namespace);
+
+ void addWatermarkHold(String namespace, Instant watermarkHold);
+
+ void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
+
+ void fireTimers(long newWatermark);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
new file mode 100644
index 0000000..3b864d5
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import avro.shaded.com.google.common.collect.Maps;
+import avro.shaded.com.google.common.collect.Sets;
+import com.alibaba.jstorm.utils.Pair;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.joda.time.Instant;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Default implementation of {@link TimerService}.
+ */
+public class TimerServiceImpl implements TimerService {
+ private transient ExecutorContext executorContext;
+ private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
+
+ private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark = new ConcurrentHashMap<>();
+ private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
+ private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
+ private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
+ private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = new PriorityQueue<>();
+ private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
+ timerDataToKeyedExecutors = Maps.newHashMap();
+
+ private boolean initialized = false;
+
+ public TimerServiceImpl() {
+ }
+
+ public TimerServiceImpl(ExecutorContext executorContext) {
+ this.executorContext = executorContext;
+ this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor();
+ }
+
+ @Override
+ public void init(List<Integer> upStreamTasks) {
+ for (Integer task : upStreamTasks) {
+ upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+ inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+ }
+ initialized = true;
+ }
+
+ @Override
+ public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) {
+ checkState(initialized, "TimerService has not been initialized.");
+ Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task);
+ // Make sure the input watermark don't go backward.
+ if (taskInputWatermark > oldTaskInputWatermark) {
+ upStreamTaskToInputWatermark.put(task, taskInputWatermark);
+ inputWatermarks.add(taskInputWatermark);
+ inputWatermarks.remove(oldTaskInputWatermark);
+
+ long newLocalInputWatermark = currentInputWatermark();
+ if (newLocalInputWatermark > oldTaskInputWatermark) {
+ return newLocalInputWatermark;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public void fireTimers(long newWatermark) {
+ TimerInternals.TimerData timerData;
+ while ((timerData = eventTimeTimersQueue.peek()) != null
+ && timerData.getTimestamp().getMillis() <= newWatermark) {
+ for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) {
+ DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
+ executor.onTimer(keyedExecutor.getSecond(), timerData);
+ }
+ eventTimeTimersQueue.remove();
+ timerDataToKeyedExecutors.remove(timerData);
+ }
+ }
+
+ @Override
+ public long currentInputWatermark() {
+ return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+ }
+
+ @Override
+ public long currentOutputWatermark() {
+ if (watermarkHolds.isEmpty()) {
+ return currentInputWatermark();
+ } else {
+ return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis());
+ }
+ }
+
+ @Override
+ public void clearWatermarkHold(String namespace) {
+ Instant currentHold = namespaceToWatermarkHold.get(namespace);
+ if (currentHold != null) {
+ watermarkHolds.remove(currentHold);
+ namespaceToWatermarkHold.remove(namespace);
+ }
+ }
+
+ @Override
+ public void addWatermarkHold(String namespace, Instant watermarkHold) {
+ Instant currentHold = namespaceToWatermarkHold.get(namespace);
+ if (currentHold == null) {
+ namespaceToWatermarkHold.put(namespace, watermarkHold);
+ watermarkHolds.add(watermarkHold);
+ } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
+ namespaceToWatermarkHold.put(namespace, watermarkHold);
+ watermarkHolds.add(watermarkHold);
+ watermarkHolds.remove(currentHold);
+ }
+ }
+
+ @Override
+ public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) {
+ checkArgument(
+ TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
+ String.format("Does not support domain: %s.", timerData.getDomain()));
+ Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData);
+ if (keyedExecutors == null) {
+ keyedExecutors = Sets.newHashSet();
+ eventTimeTimersQueue.add(timerData);
+ }
+ keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
+ timerDataToKeyedExecutors.put(timerData, keyedExecutors);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
new file mode 100644
index 0000000..0fb88ab
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import com.alibaba.jstorm.cache.IKvStore;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
+
+ private static final String TIME_SERVICE_STORE_ID = "timer_service_store";
+ private static final String TIMER_SERVICE_KET = "timer_service_key";
+
+ private ExecutorsBolt executorsBolt;
+ private IKvStoreManager kvStoreManager;
+ private IKvStore<String, TimerService> timerServiceStore;
+
+ public TxExecutorsBolt(ExecutorsBolt executorsBolt) {
+ this.executorsBolt = executorsBolt;
+ this.executorsBolt.setStatefulBolt(true);
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ try {
+ executorsBolt.prepare(stormConf, context, collector);
+ kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager();
+ timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
+ } catch (IOException e) {
+ LOG.error("Failed to prepare stateful bolt", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ executorsBolt.execute(input);
+ }
+
+ @Override
+ public void cleanup() {
+ executorsBolt.cleanup();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ executorsBolt.declareOutputFields(declarer);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return executorsBolt.getComponentConfiguration();
+ }
+
+ @Override
+ public void initState(Object userState) {
+ LOG.info("Begin to init from state: {}", userState);
+ restore(userState);
+ }
+
+ @Override
+ public Object finishBatch(long batchId) {
+ try {
+ timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService());
+ } catch (IOException e) {
+ LOG.error("Failed to store current timer service status", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ kvStoreManager.checkpoint(batchId);
+ return null;
+ }
+
+ @Override
+ public Object commit(long batchId, Object state) {
+ return kvStoreManager.backup(batchId);
+ }
+
+ @Override
+ public void rollBack(Object userState) {
+ LOG.info("Begin to rollback from state: {}", userState);
+ restore(userState);
+ }
+
+ @Override
+ public void ackCommit(long batchId, long timeStamp) {
+ kvStoreManager.remove(batchId);
+ }
+
+ private void restore(Object userState) {
+ try {
+ // restore all states
+ kvStoreManager.restore(userState);
+
+ // init timer service
+ timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
+ TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET);
+ if (timerService == null) {
+ timerService = executorsBolt.initTimerService();
+ }
+ executorsBolt.setTimerService(timerService);
+ } catch (IOException e) {
+ LOG.error("Failed to restore state", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
new file mode 100644
index 0000000..22dd07b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import com.alibaba.jstorm.cache.IKvStore;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.KvStoreManagerFactory;
+import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
+
+ private static final String SOURCE_STORE_ID = "SourceCheckpoint";
+ private static final String CHECKPOINT_MARK = "CheckpointMark";
+
+ private UnboundedSourceSpout sourceSpout;
+ private UnboundedSource.UnboundedReader reader;
+ private IKvStoreManager kvStoreManager;
+ private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore;
+
+ public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) {
+ this.sourceSpout = sourceSpout;
+ }
+
+ private void restore(Object userState) {
+ try {
+ kvStoreManager.restore(userState);
+ sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID);
+ UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK);
+ sourceSpout.createSourceReader(checkpointMark);
+ reader = sourceSpout.getUnboundedSourceReader();
+ } catch (IOException e) {
+ LOG.error("Failed to init state", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void initState(Object userState) {
+ restore(userState);
+ }
+
+ @Override
+ public Object finishBatch(long checkpointId) {
+ try {
+ // Store check point mark from unbounded source reader
+ UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark();
+ sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark);
+
+ // checkpoint all kv stores in current manager
+ kvStoreManager.checkpoint(checkpointId);
+ } catch (IOException e) {
+ LOG.error(String.format("Failed to finish batch-%s", checkpointId), e);
+ throw new RuntimeException(e.getMessage());
+ }
+ return null;
+ }
+
+ @Override
+ public Object commit(long batchId, Object state) {
+ // backup kv stores to remote state backend
+ return kvStoreManager.backup(batchId);
+ }
+
+ @Override
+ public void rollBack(Object userState) {
+ restore(userState);
+ }
+
+ @Override
+ public void ackCommit(long batchId, long timeStamp) {
+ // remove obsolete state in bolt local and remote state backend
+ kvStoreManager.remove(batchId);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ sourceSpout.declareOutputFields(declarer);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return sourceSpout.getComponentConfiguration();
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ try {
+ sourceSpout.open(conf, context, collector);
+ String storeName = String.format("task-%s", context.getThisTaskId());
+ String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
+ kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, storePath, true);
+
+ reader = sourceSpout.getUnboundedSourceReader();
+ } catch (IOException e) {
+ LOG.error("Failed to open transactional unbounded source spout", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() {
+ sourceSpout.close();
+ }
+
+ @Override
+ public void activate() {
+ sourceSpout.activate();
+ }
+
+ @Override
+ public void deactivate() {
+ sourceSpout.deactivate();
+ }
+
+ @Override
+ public void nextTuple() {
+ sourceSpout.nextTuple();
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
new file mode 100644
index 0000000..973f703
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import com.alibaba.jstorm.utils.KryoSerializer;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Spout implementation that wraps a Beam UnboundedSource
+ *
+ * TODO: add wrapper to support metrics in UnboundedSource.
+ */
+public class UnboundedSourceSpout extends AdaptorBasicSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
+
+ private final String description;
+ private final UnboundedSource source;
+ private final SerializedPipelineOptions serializedOptions;
+ private final TupleTag<?> outputTag;
+
+ private transient StormPipelineOptions pipelineOptions;
+ private transient UnboundedSource.UnboundedReader reader;
+ private transient SpoutOutputCollector collector;
+
+ private volatile boolean hasNextRecord;
+ private AtomicBoolean activated = new AtomicBoolean();
+
+ private KryoSerializer<WindowedValue> serializer;
+
+ private long lastWaterMark = 0l;
+
+ public UnboundedSourceSpout(
+ String description,
+ UnboundedSource source,
+ StormPipelineOptions options,
+ TupleTag<?> outputTag) {
+ this.description = checkNotNull(description, "description");
+ this.source = checkNotNull(source, "source");
+ this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
+ this.outputTag = checkNotNull(outputTag, "outputTag");
+ }
+
+ @Override
+ public synchronized void close() {
+ try {
+ activated.set(false);
+ this.reader.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void activate() {
+ activated.set(true);
+
+ }
+
+ @Override
+ public void deactivate() {
+ activated.set(false);
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ try {
+ this.collector = collector;
+ this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class);
+
+ createSourceReader(null);
+
+ this.serializer = new KryoSerializer<>(conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create unbounded reader.", e);
+ }
+ }
+
+ public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ reader = this.source.createReader(this.pipelineOptions, checkpointMark);
+ hasNextRecord = this.reader.start();
+ }
+
+ @Override
+ public synchronized void nextTuple() {
+ if (!activated.get()) {
+ return;
+ }
+ try {
+ if (!hasNextRecord) {
+ hasNextRecord = reader.advance();
+ }
+
+ while (hasNextRecord && activated.get()) {
+ Object value = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ WindowedValue wv = WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ LOG.debug("Source output: " + wv.getValue());
+ if (keyedEmit(outputTag.getId())) {
+ KV kv = (KV) wv.getValue();
+ // Convert WindowedValue<KV> to <K, WindowedValue<V>>
+ byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue()));
+ collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue));
+ } else {
+ byte[] immutableValue = serializer.serialize(wv);
+ collector.emit(outputTag.getId(), new Values(immutableValue));
+ }
+
+ // move to next record
+ hasNextRecord = reader.advance();
+ }
+
+ Instant waterMark = reader.getWatermark();
+ if (waterMark != null && lastWaterMark < waterMark.getMillis()) {
+ lastWaterMark = waterMark.getMillis();
+ collector.flush();
+ collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis()));
+ LOG.debug("Source output: WM-{}", waterMark.toDateTime());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Exception reading values from source.", e);
+ }
+ }
+
+ public UnboundedSource getUnboundedSource() {
+ return source;
+ }
+
+ public UnboundedSource.UnboundedReader getUnboundedSourceReader() {
+ return reader;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
new file mode 100644
index 0000000..7b0e8db
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * JStorm {@link Executor} for {@link View}.
+ */
+public class ViewExecutor implements Executor {
+
+ private final String description;
+ private final TupleTag outputTag;
+ private ExecutorsBolt executorsBolt;
+
+ public ViewExecutor(String description, TupleTag outputTag) {
+ this.description = description;
+ this.outputTag = outputTag;
+ }
+
+ @Override
+ public void init(ExecutorContext context) {
+ this.executorsBolt = context.getExecutorsBolt();
+ }
+
+ @Override
+ public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+ executorsBolt.processExecutorElem(outputTag, elem);
+ }
+
+ @Override
+ public void cleanup() {}
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
new file mode 100644
index 0000000..a6c3c16
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
+ private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
+
+ private final String description;
+ private WindowFn<T, W> windowFn;
+ private ExecutorsBolt executorsBolt;
+ private TupleTag outputTag;
+
+ class JStormAssignContext<InputT, W extends BoundedWindow>
+ extends WindowFn<InputT, W>.AssignContext {
+ private final WindowedValue<InputT> value;
+
+ JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+ fn.super();
+ checkArgument(
+ Iterables.size(value.getWindows()) == 1,
+ String.format(
+ "%s passed to window assignment must be in a single window, but it was in %s: %s",
+ WindowedValue.class.getSimpleName(),
+ Iterables.size(value.getWindows()),
+ value.getWindows()));
+ this.value = value;
+ }
+
+ @Override
+ public InputT element() {
+ return value.getValue();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return value.getTimestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return Iterables.getOnlyElement(value.getWindows());
+ }
+ }
+
+ public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) {
+ this.description = description;
+ this.windowFn = windowFn;
+ this.outputTag = outputTag;
+ }
+
+ @Override
+ public void init(ExecutorContext context) {
+ this.executorsBolt = context.getExecutorsBolt();
+ }
+
+ @Override
+ public void process(TupleTag tag, WindowedValue elem) {
+ Collection<W> windows = null;
+ try {
+ windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem));
+ for (W window: windows) {
+ executorsBolt.processExecutorElem(
+ outputTag,
+ WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to assign windows for elem=" + elem, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {}
+
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
new file mode 100644
index 0000000..eaf0549
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStore;
+import com.alibaba.jstorm.cache.KvStoreIterable;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * JStorm implementation of {@link BagState}.
+ */
+class JStormBagState<K, T> implements BagState<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
+
+ @Nullable
+ private final K key;
+ private final StateNamespace namespace;
+ private final IKvStore<ComposedKey, T> kvState;
+ private final IKvStore<ComposedKey, Object> stateInfoKvState;
+ private int elemIndex;
+
+ public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
+ IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
+ this.key = key;
+ this.namespace = checkNotNull(namespace, "namespace");
+ this.kvState = checkNotNull(kvState, "kvState");
+ this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
+
+ Integer index = (Integer) stateInfoKvState.get(getComposedKey());
+ this.elemIndex = index != null ? ++index : 0;
+ }
+
+ @Override
+ public void add(T input) {
+ try {
+ kvState.put(getComposedKey(elemIndex), input);
+ stateInfoKvState.put(getComposedKey(), elemIndex);
+ elemIndex++;
+ } catch (IOException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public Boolean read() {
+ return elemIndex <= 0;
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public Iterable<T> read() {
+ return new BagStateIterable(elemIndex);
+ }
+
+ @Override
+ public BagState readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ try {
+ for (int i = 0; i < elemIndex; i++) {
+ kvState.remove(getComposedKey(i));
+ }
+ stateInfoKvState.remove(getComposedKey());
+ elemIndex = 0;
+ } catch (IOException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ private ComposedKey getComposedKey() {
+ return ComposedKey.of(key, namespace);
+ }
+
+ private ComposedKey getComposedKey(int elemIndex) {
+ return ComposedKey.of(key, namespace, elemIndex);
+ }
+
+ private class BagStateIterable implements KvStoreIterable<T> {
+
+ private class BagStateIterator implements Iterator<T> {
+ private final int size;
+ private int cursor = 0;
+
+ BagStateIterator() {
+ Integer s = null;
+ try {
+ s = (Integer) stateInfoKvState.get(getComposedKey());
+ } catch (IOException e) {
+ LOG.error("Failed to get elemIndex for key={}", getComposedKey());
+ }
+ this.size = s != null ? ++s : 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return cursor < size;
+ }
+
+ @Override
+ public T next() {
+ if (cursor >= size) {
+ throw new NoSuchElementException();
+ }
+
+ T value = null;
+ try {
+ value = kvState.get(getComposedKey(cursor));
+ } catch (IOException e) {
+ LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
+ }
+ cursor++;
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private final int size;
+
+ BagStateIterable(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new BagStateIterator();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("BagStateIterable: composedKey=%s", getComposedKey());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
new file mode 100644
index 0000000..b0fe29b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * JStorm implementation of {@link CombiningState}.
+ */
+public class JStormCombiningState<InputT, AccumT, OutputT>
+ implements CombiningState<InputT, AccumT, OutputT> {
+
+ @Nullable
+ private final BagState<AccumT> accumBagState;
+ private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+ JStormCombiningState(
+ BagState<AccumT> accumBagState,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ this.accumBagState = checkNotNull(accumBagState, "accumBagState");
+ this.combineFn = checkNotNull(combineFn, "combineFn");
+ }
+
+ @Override
+ public AccumT getAccum() {
+ // TODO: replacing the accumBagState with the merged accum.
+ return combineFn.mergeAccumulators(accumBagState.read());
+ }
+
+ @Override
+ public void addAccum(AccumT accumT) {
+ accumBagState.add(accumT);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
+ return combineFn.mergeAccumulators(iterable);
+ }
+
+ @Override
+ public void add(InputT input) {
+ accumBagState.add(
+ combineFn.addInput(combineFn.createAccumulator(), input));
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return accumBagState.isEmpty();
+ }
+
+ @Override
+ public OutputT read() {
+ return combineFn.extractOutput(
+ combineFn.mergeAccumulators(accumBagState.read()));
+ }
+
+ @Override
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ accumBagState.clear();
+ }
+}