You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:13 UTC
[13/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
index 9df1e17..e80fb48 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
@@ -17,13 +17,15 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime;
-import java.io.IOException;
-import java.util.*;
+import static com.google.common.base.Preconditions.checkNotNull;
import avro.shaded.com.google.common.base.Joiner;
import avro.shaded.com.google.common.collect.Sets;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.ITupleExt;
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
import com.alibaba.jstorm.cache.IKvStoreManager;
import com.alibaba.jstorm.cache.KvStoreManagerFactory;
import com.alibaba.jstorm.cluster.Common;
@@ -31,6 +33,14 @@ import com.alibaba.jstorm.utils.KryoSerializer;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -39,289 +49,287 @@ import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
public class ExecutorsBolt extends AdaptorBasicBolt {
- private static final long serialVersionUID = -7751043327801735211L;
+ private static final long serialVersionUID = -7751043327801735211L;
- private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
- protected ExecutorContext executorContext;
+ protected ExecutorContext executorContext;
- protected TimerService timerService;
+ protected TimerService timerService;
- // map from input tag to executor inside bolt
- protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
- // set of all output tags that will be emit outside bolt
- protected final Set<TupleTag> outputTags = Sets.newHashSet();
- protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
- protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet();
- protected int internalDoFnExecutorId = 1;
- protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
+ // map from input tag to executor inside bolt
+ protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
+ // set of all output tags that will be emit outside bolt
+ protected final Set<TupleTag> outputTags = Sets.newHashSet();
+ protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
+ protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet();
+ protected int internalDoFnExecutorId = 1;
+ protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
- protected OutputCollector collector;
+ protected OutputCollector collector;
- protected boolean isStatefulBolt = false;
+ protected boolean isStatefulBolt = false;
- protected KryoSerializer<WindowedValue> serializer;
+ protected KryoSerializer<WindowedValue> serializer;
- public ExecutorsBolt() {
+ public ExecutorsBolt() {
- }
-
- public void setStatefulBolt(boolean isStateful) {
- isStatefulBolt = isStateful;
- }
-
- public void addExecutor(TupleTag inputTag, Executor executor) {
- inputTagToExecutor.put(
- checkNotNull(inputTag, "inputTag"),
- checkNotNull(executor, "executor"));
- }
-
- public Map<TupleTag, Executor> getExecutors() {
- return inputTagToExecutor;
- }
-
- public void registerExecutor(Executor executor) {
- if (executor instanceof DoFnExecutor) {
- DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
- idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor);
- doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId);
- internalDoFnExecutorId++;
- }
- }
-
- public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() {
- return idToDoFnExecutor;
- }
+ }
- public void addOutputTags(TupleTag tag) {
- outputTags.add(tag);
- }
+ public void setStatefulBolt(boolean isStateful) {
+ isStatefulBolt = isStateful;
+ }
- public void addExternalOutputTag(TupleTag<?> tag) {
- externalOutputTags.add(tag);
- }
+ public void addExecutor(TupleTag inputTag, Executor executor) {
+ inputTagToExecutor.put(
+ checkNotNull(inputTag, "inputTag"),
+ checkNotNull(executor, "executor"));
+ }
- public Set<TupleTag> getOutputTags() {
- return outputTags;
- }
+ public Map<TupleTag, Executor> getExecutors() {
+ return inputTagToExecutor;
+ }
- public ExecutorContext getExecutorContext() {
- return executorContext;
+ public void registerExecutor(Executor executor) {
+ if (executor instanceof DoFnExecutor) {
+ DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
+ idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor);
+ doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId);
+ internalDoFnExecutorId++;
}
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- LOG.info("Start to prepare for task-{}", context.getThisTaskId());
- try {
- this.collector = collector;
-
- // init kv store manager
- String storeName = String.format("task-%d", context.getThisTaskId());
- String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
- IKvStoreManager kvStoreManager = isStatefulBolt ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, stateStorePath, isStatefulBolt) :
- KvStoreManagerFactory.getKvStoreManager(stormConf, storeName, stateStorePath, isStatefulBolt);
- this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
-
- // init time service
- timerService = initTimerService();
-
- // init all internal executors
- for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
- executor.init(executorContext);
- if (executor instanceof DoFnExecutor) {
- doFnExecutors.add((DoFnExecutor) executor);
- }
- }
-
- this.serializer = new KryoSerializer<WindowedValue>(stormConf);
-
- LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values());
- LOG.info("inputTagToExecutor={}", inputTagToExecutor);
- LOG.info("outputTags={}", outputTags);
- LOG.info("externalOutputTags={}", externalOutputTags);
- LOG.info("doFnExecutors={}", doFnExecutors);
- } catch (IOException e) {
- throw new RuntimeException("Failed to prepare executors bolt", e);
+ }
+
+ public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() {
+ return idToDoFnExecutor;
+ }
+
+ public void addOutputTags(TupleTag tag) {
+ outputTags.add(tag);
+ }
+
+ public void addExternalOutputTag(TupleTag<?> tag) {
+ externalOutputTags.add(tag);
+ }
+
+ public Set<TupleTag> getOutputTags() {
+ return outputTags;
+ }
+
+ public ExecutorContext getExecutorContext() {
+ return executorContext;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ LOG.info("Start to prepare for task-{}", context.getThisTaskId());
+ try {
+ this.collector = collector;
+
+ // init kv store manager
+ String storeName = String.format("task-%d", context.getThisTaskId());
+ String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
+ IKvStoreManager kvStoreManager = isStatefulBolt ?
+ KvStoreManagerFactory.getKvStoreManagerWithMonitor(
+ context, storeName, stateStorePath, isStatefulBolt) :
+ KvStoreManagerFactory.getKvStoreManager(
+ stormConf, storeName, stateStorePath, isStatefulBolt);
+ this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
+
+ // init time service
+ timerService = initTimerService();
+
+ // init all internal executors
+ for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
+ executor.init(executorContext);
+ if (executor instanceof DoFnExecutor) {
+ doFnExecutors.add((DoFnExecutor) executor);
}
- }
+ }
- public TimerService initTimerService() {
- TopologyContext context = executorContext.getTopologyContext();
- List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet())
- .transformAndConcat(
- new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() {
- @Override
- public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) {
- if (Common.isSystemComponent(value.getKey())) {
- return Collections.EMPTY_LIST;
- } else {
- return value.getValue();
- }
- }
- })
- .toList();
- TimerService ret = new TimerServiceImpl(executorContext);
- ret.init(tasks);
- return ret;
- }
+ this.serializer = new KryoSerializer<WindowedValue>(stormConf);
- @Override
- public void execute(Tuple input) {
- // process a batch
- String streamId = input.getSourceStreamId();
- ITupleExt tuple = (ITupleExt) input;
- Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
- if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
- while (valueIterator.hasNext()) {
- processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
- }
- } else {
- doFnStartBundle();
- while (valueIterator.hasNext()) {
- processElement(valueIterator.next(), streamId);
- }
- doFnFinishBundle();
- }
+ LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values());
+ LOG.info("inputTagToExecutor={}", inputTagToExecutor);
+ LOG.info("outputTags={}", outputTags);
+ LOG.info("externalOutputTags={}", externalOutputTags);
+ LOG.info("doFnExecutors={}", doFnExecutors);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to prepare executors bolt", e);
}
-
- private void processWatermark(long watermarkTs, int sourceTask) {
- long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs);
- LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}",
- (new Instant(watermarkTs)).toDateTime(), sourceTask, (new Instant(newWaterMark)).toDateTime());
- if (newWaterMark != 0) {
- // Some buffer windows are going to be triggered.
- doFnStartBundle();
- timerService.fireTimers(newWaterMark);
-
- // SideInput: If receiving water mark with max timestamp, It means no more data is supposed
- // to be received from now on. So we are going to process all push back data.
- if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
- for (DoFnExecutor doFnExecutor : doFnExecutors) {
- doFnExecutor.processAllPushBackElements();
+ }
+
+ public TimerService initTimerService() {
+ TopologyContext context = executorContext.getTopologyContext();
+ List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet())
+ .transformAndConcat(
+ new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() {
+ @Override
+ public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) {
+ if (Common.isSystemComponent(value.getKey())) {
+ return Collections.EMPTY_LIST;
+ } else {
+ return value.getValue();
}
- }
-
- doFnFinishBundle();
- }
-
- long currentWaterMark = timerService.currentOutputWatermark();
- if (!externalOutputTags.isEmpty()) {
- collector.flush();
- collector.emit(
- CommonInstance.BEAM_WATERMARK_STREAM_ID,
- new Values(currentWaterMark));
- LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime());
- }
- }
-
- private void processElement(List<Object> values, String streamId) {
- TupleTag inputTag = new TupleTag(streamId);
- WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values);
- processExecutorElem(inputTag, windowedValue);
+ }
+ })
+ .toList();
+ TimerService ret = new TimerServiceImpl(executorContext);
+ ret.init(tasks);
+ return ret;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ // process a batch
+ String streamId = input.getSourceStreamId();
+ ITupleExt tuple = (ITupleExt) input;
+ Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
+ if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
+ while (valueIterator.hasNext()) {
+ processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
+ }
+ } else {
+ doFnStartBundle();
+ while (valueIterator.hasNext()) {
+ processElement(valueIterator.next(), streamId);
+ }
+ doFnFinishBundle();
}
-
- public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
- LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
- if (elem != null) {
- Executor executor = inputTagToExecutor.get(inputTag);
- if (executor != null) {
- executor.process(inputTag, elem);
- }
- if (externalOutputTags.contains(inputTag)) {
- emitOutsideBolt(inputTag, elem);
- }
- } else {
- LOG.info("Received null elem for tag={}", inputTag);
+ }
+
+ private void processWatermark(long watermarkTs, int sourceTask) {
+ long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs);
+ LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}",
+ (new Instant(watermarkTs)).toDateTime(),
+ sourceTask,
+ (new Instant(newWaterMark)).toDateTime());
+ if (newWaterMark != 0) {
+ // Some buffer windows are going to be triggered.
+ doFnStartBundle();
+ timerService.fireTimers(newWaterMark);
+
+ // SideInput: If receiving water mark with max timestamp, It means no more data is supposed
+ // to be received from now on. So we are going to process all push back data.
+ if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ for (DoFnExecutor doFnExecutor : doFnExecutors) {
+ doFnExecutor.processAllPushBackElements();
}
- }
+ }
- @Override
- public void cleanup() {
- for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
- executor.cleanup();
- }
- executorContext.getKvStoreManager().close();
+ doFnFinishBundle();
}
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
+ long currentWaterMark = timerService.currentOutputWatermark();
+ if (!externalOutputTags.isEmpty()) {
+ collector.flush();
+ collector.emit(
+ CommonInstance.BEAM_WATERMARK_STREAM_ID,
+ new Values(currentWaterMark));
+ LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime());
}
-
- public TimerService timerService() {
- return timerService;
+ }
+
+ private void processElement(List<Object> values, String streamId) {
+ TupleTag inputTag = new TupleTag(streamId);
+ WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values);
+ processExecutorElem(inputTag, windowedValue);
+ }
+
+ public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
+ LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
+ if (elem != null) {
+ Executor executor = inputTagToExecutor.get(inputTag);
+ if (executor != null) {
+ executor.process(inputTag, elem);
+ }
+ if (externalOutputTags.contains(inputTag)) {
+ emitOutsideBolt(inputTag, elem);
+ }
+ } else {
+ LOG.info("Received null elem for tag={}", inputTag);
}
+ }
- public void setTimerService(TimerService service) {
- timerService = service;
+ @Override
+ public void cleanup() {
+ for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
+ executor.cleanup();
}
-
- private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) {
- WindowedValue wv = null;
- if (values.size() > 1) {
- Object key = values.get(0);
- WindowedValue value = serializer.deserialize((byte[]) values.get(1));
- wv = value.withValue(KV.of(key, value.getValue()));
- } else {
- wv = serializer.deserialize((byte[])values.get(0));
- }
- return wv;
+ executorContext.getKvStoreManager().close();
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ public TimerService timerService() {
+ return timerService;
+ }
+
+ public void setTimerService(TimerService service) {
+ timerService = service;
+ }
+
+ private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) {
+ WindowedValue wv = null;
+ if (values.size() > 1) {
+ Object key = values.get(0);
+ WindowedValue value = serializer.deserialize((byte[]) values.get(1));
+ wv = value.withValue(KV.of(key, value.getValue()));
+ } else {
+ wv = serializer.deserialize((byte[]) values.get(0));
}
-
- protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) {
- LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue());
- if (keyedEmit(outputTag.getId())) {
- KV kv = (KV) outputValue.getValue();
- byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue()));
- // Convert WindowedValue<KV> to <K, WindowedValue<V>>
- if (kv.getKey() == null) {
- // If key is null, emit "null" string here. Because, null value will be ignored in JStorm.
- collector.emit(outputTag.getId(), new Values("null", immutableOutputValue));
- } else {
- collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue));
- }
- } else {
- byte[] immutableOutputValue = serializer.serialize(outputValue);
- collector.emit(outputTag.getId(), new Values(immutableOutputValue));
- }
+ return wv;
+ }
+
+ protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) {
+ LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue());
+ if (keyedEmit(outputTag.getId())) {
+ KV kv = (KV) outputValue.getValue();
+ byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue()));
+ // Convert WindowedValue<KV> to <K, WindowedValue<V>>
+ if (kv.getKey() == null) {
+ // If key is null, emit "null" string here. Because, null value will be ignored in JStorm.
+ collector.emit(outputTag.getId(), new Values("null", immutableOutputValue));
+ } else {
+ collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue));
+ }
+ } else {
+ byte[] immutableOutputValue = serializer.serialize(outputValue);
+ collector.emit(outputTag.getId(), new Values(immutableOutputValue));
}
+ }
- private void doFnStartBundle() {
- for (DoFnExecutor doFnExecutor : doFnExecutors) {
- doFnExecutor.startBundle();
- }
+ private void doFnStartBundle() {
+ for (DoFnExecutor doFnExecutor : doFnExecutors) {
+ doFnExecutor.startBundle();
}
+ }
- private void doFnFinishBundle() {
- for (DoFnExecutor doFnExecutor : doFnExecutors) {
- doFnExecutor.finishBundle();
- }
+ private void doFnFinishBundle() {
+ for (DoFnExecutor doFnExecutor : doFnExecutors) {
+ doFnExecutor.finishBundle();
}
+ }
- @Override
- public String toString() {
- // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString());
- List<String> ret = new ArrayList<>();
+ @Override
+ public String toString() {
+ // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString());
+ List<String> ret = new ArrayList<>();
/*ret.add("inputTags");
for (TupleTag inputTag : inputTagToExecutor.keySet()) {
ret.add(inputTag.getId());
}*/
- ret.add("internalExecutors");
- for (Executor executor : inputTagToExecutor.values()) {
- ret.add(executor.toString());
- }
- ret.add("externalOutputTags");
- for (TupleTag output : externalOutputTags) {
- ret.add(output.getId());
- }
- return Joiner.on('\n').join(ret).concat("\n");
+ ret.add("internalExecutors");
+ for (Executor executor : inputTagToExecutor.values()) {
+ ret.add(executor.toString());
+ }
+ ret.add("externalOutputTags");
+ for (TupleTag output : externalOutputTags) {
+ ret.add(output.getId());
}
+ return Joiner.on('\n').join(ret).concat("\n");
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
index 1ef28c9..5a07243 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
@@ -17,39 +17,40 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
-import static com.google.common.base.Preconditions.checkNotNull;
-
public class FlattenExecutor<InputT> implements Executor {
- private final String description;
- private TupleTag mainOutputTag;
- private ExecutorContext context;
- private ExecutorsBolt executorsBolt;
-
- public FlattenExecutor(String description, TupleTag mainTupleTag) {
- this.description = checkNotNull(description, "description");
- this.mainOutputTag = mainTupleTag;
- }
-
- @Override
- public void init(ExecutorContext context) {
- this.context = context;
- this.executorsBolt = context.getExecutorsBolt();
- }
-
- @Override
- public void process(TupleTag tag, WindowedValue elem) {
- executorsBolt.processExecutorElem(mainOutputTag, elem);
- }
-
- @Override
- public void cleanup() {}
-
- @Override
- public String toString() {
- return description;
- }
+ private final String description;
+ private TupleTag mainOutputTag;
+ private ExecutorContext context;
+ private ExecutorsBolt executorsBolt;
+
+ public FlattenExecutor(String description, TupleTag mainTupleTag) {
+ this.description = checkNotNull(description, "description");
+ this.mainOutputTag = mainTupleTag;
+ }
+
+ @Override
+ public void init(ExecutorContext context) {
+ this.context = context;
+ this.executorsBolt = context.getExecutorsBolt();
+ }
+
+ @Override
+ public void process(TupleTag tag, WindowedValue elem) {
+ executorsBolt.processExecutorElem(mainOutputTag, elem);
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
index 299ceb2..625726d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
@@ -17,18 +17,17 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime;
-import java.io.Serializable;
-import java.util.List;
+import static com.google.common.base.Preconditions.checkArgument;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.List;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
@@ -36,122 +35,138 @@ import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext;
+import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
+import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
+import org.apache.beam.runners.jstorm.util.RunnerUtils;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
-
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext;
-import org.apache.beam.runners.jstorm.util.RunnerUtils;
+import org.apache.beam.sdk.values.WindowingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.base.Preconditions.checkArgument;
-
-public class GroupByWindowExecutor<K, V> extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
- private static final long serialVersionUID = -7563050475488610553L;
-
- private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
-
- private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
-
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- executorsBolt.processExecutorElem(tag, output);
- }
- }
-
- private KvCoder<K, V> inputKvCoder;
- private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
-
- public GroupByWindowExecutor(
- String stepName,
- String description,
- TranslationContext context,
- JStormPipelineOptions pipelineOptions,
- WindowingStrategy<?, ?> windowingStrategy,
- TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
- // The doFn will be created when runtime. Just pass "null" here
- super(stepName, description, pipelineOptions, null, null, windowingStrategy, null, null, null, mainTupleTag, sideOutputTags);
-
- this.outputManager = new GroupByWindowOutputManager();
- UserGraphContext userGraphContext = context.getUserGraphContext();
- PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
- this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
- }
-
- private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() {
- final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() {
- @Override
- public StateInternals stateInternalsForKey(K key) {
- return new JStormStateInternals<K>(key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
- }
- };
- TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() {
- @Override
- public TimerInternals timerInternalsForKey(K key) {
- return new JStormTimerInternals<>(key, GroupByWindowExecutor.this, executorContext.getExecutorsBolt().timerService());
- }
- };
-
- reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder());
- DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn =
- GroupAlsoByWindowViaWindowSetNewDoFn.create(
- windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(),
- (SystemReduceFn) reduceFn, outputManager, mainTupleTag);
- return doFn;
- }
-
- @Override
- protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() {
- doFn = getGroupByWindowDoFn();
-
- DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>simpleRunner(
- this.pipelineOptions,
- this.doFn,
- NullSideInputReader.empty(),
- this.outputManager,
- this.mainTupleTag,
- this.sideOutputTags,
- this.stepContext,
- this.windowingStrategy);
-
- DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner = DoFnRunners.lateDataDroppingRunner(
- simpleRunner,
- this.stepContext,
- this.windowingStrategy);
- return new DoFnRunnerWithMetrics<>(
- stepName, doFnRunner, MetricsReporter.create(metricClient));
- }
-
- @Override
- public void process(TupleTag tag, WindowedValue elem) {
- /**
- * For GroupByKey, KV type elem is received. We need to convert the KV elem
- * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner.
- */
- KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem);
- runner.processElement(elem.withValue(keyedWorkItem));
- }
+public class GroupByWindowExecutor<K, V>
+ extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
+ private static final long serialVersionUID = -7563050475488610553L;
- @Override
- public void onTimer(Object key, TimerInternals.TimerData timerData) {
- StateNamespace namespace = timerData.getNamespace();
- checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+ private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
- runner.processElement(
- WindowedValue.valueInGlobalWindow(
- KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData))));
- }
+ private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
@Override
- public String toString() {
- return super.toString();
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ executorsBolt.processExecutorElem(tag, output);
}
+ }
+
+ private KvCoder<K, V> inputKvCoder;
+ private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
+
+ public GroupByWindowExecutor(
+ String stepName,
+ String description,
+ TranslationContext context,
+ JStormPipelineOptions pipelineOptions,
+ WindowingStrategy<?, ?> windowingStrategy,
+ TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
+ // The doFn will be created when runtime. Just pass "null" here
+ super(
+ stepName,
+ description,
+ pipelineOptions,
+ null,
+ null,
+ windowingStrategy,
+ null,
+ null,
+ null,
+ mainTupleTag,
+ sideOutputTags);
+
+ this.outputManager = new GroupByWindowOutputManager();
+ UserGraphContext userGraphContext = context.getUserGraphContext();
+ PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
+ this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
+ }
+
+ private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() {
+ final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() {
+ @Override
+ public StateInternals stateInternalsForKey(K key) {
+ return new JStormStateInternals<K>(
+ key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
+ }
+ };
+ TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() {
+ @Override
+ public TimerInternals timerInternalsForKey(K key) {
+ return new JStormTimerInternals<>(
+ key,
+ GroupByWindowExecutor.this,
+ executorContext.getExecutorsBolt().timerService());
+ }
+ };
+
+ reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder());
+ DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn =
+ GroupAlsoByWindowViaWindowSetNewDoFn.create(
+ windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(),
+ (SystemReduceFn) reduceFn, outputManager, mainTupleTag);
+ return doFn;
+ }
+
+ @Override
+ protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() {
+ doFn = getGroupByWindowDoFn();
+
+ DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.simpleRunner(
+ this.pipelineOptions,
+ this.doFn,
+ NullSideInputReader.empty(),
+ this.outputManager,
+ this.mainTupleTag,
+ this.sideOutputTags,
+ this.stepContext,
+ this.windowingStrategy);
+
+ DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner =
+ DoFnRunners.lateDataDroppingRunner(
+ simpleRunner,
+ this.stepContext,
+ this.windowingStrategy);
+ return new DoFnRunnerWithMetrics<>(
+ stepName, doFnRunner, MetricsReporter.create(metricClient));
+ }
+
+ @Override
+ public void process(TupleTag tag, WindowedValue elem) {
+ /**
+ * For GroupByKey, KV type elem is received. We need to convert the KV elem
+ * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner.
+ */
+ KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem);
+ runner.processElement(elem.withValue(keyedWorkItem));
+ }
+
+ @Override
+ public void onTimer(Object key, TimerInternals.TimerData timerData) {
+ StateNamespace namespace = timerData.getNamespace();
+ checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+
+ runner.processElement(
+ WindowedValue.valueInGlobalWindow(
+ KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData))));
+ }
+
+ @Override
+ public String toString() {
+ return super.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
index cb15ea2..d36d9a6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import org.apache.beam.runners.jstorm.JStormPipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
@@ -27,49 +30,45 @@ import org.apache.beam.sdk.values.WindowingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
- private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
- /**
- * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated tag
- * is used in downstream consumer. So before output, we need to map this "local" tag to "external"
- * tag. See PCollectionTuple for details.
- */
- public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- if (localTupleTagMap.containsKey(tag)) {
- executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
- } else {
- executorsBolt.processExecutorElem(tag, output);
- }
- }
+ /**
+ * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated
+ * tag is used in downstream consumer. So before output, we need to map this "local" tag to
+ * "external" tag. See PCollectionTuple for details.
+ */
+ public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ if (localTupleTagMap.containsKey(tag)) {
+ executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
+ } else {
+ executorsBolt.processExecutorElem(tag, output);
+ }
}
+ }
- protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
+ protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
- public MultiOutputDoFnExecutor(
- String stepName,
- String description,
- JStormPipelineOptions pipelineOptions,
- DoFn<InputT, OutputT> doFn,
- Coder<WindowedValue<InputT>> inputCoder,
- WindowingStrategy<?, ?> windowingStrategy,
- TupleTag<InputT> mainInputTag,
- Collection<PCollectionView<?>> sideInputs,
- Map<TupleTag, PCollectionView<?>> sideInputTagToView,
- TupleTag<OutputT> mainTupleTag,
- List<TupleTag<?>> sideOutputTags,
- Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
- ) {
- super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
- sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
- this.localTupleTagMap = localTupleTagMap;
- this.outputManager = new MultiOutputDoFnExecutorOutputManager();
- LOG.info("localTupleTagMap: {}", localTupleTagMap);
- }
+ public MultiOutputDoFnExecutor(
+ String stepName,
+ String description,
+ JStormPipelineOptions pipelineOptions,
+ DoFn<InputT, OutputT> doFn,
+ Coder<WindowedValue<InputT>> inputCoder,
+ WindowingStrategy<?, ?> windowingStrategy,
+ TupleTag<InputT> mainInputTag,
+ Collection<PCollectionView<?>> sideInputs,
+ Map<TupleTag, PCollectionView<?>> sideInputTagToView,
+ TupleTag<OutputT> mainTupleTag,
+ List<TupleTag<?>> sideOutputTags,
+ Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
+ ) {
+ super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
+ sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
+ this.localTupleTagMap = localTupleTagMap;
+ this.outputManager = new MultiOutputDoFnExecutorOutputManager();
+ LOG.info("localTupleTagMap: {}", localTupleTagMap);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
index dd7921f..45ac62a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
@@ -17,10 +17,13 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.jstorm.JStormPipelineOptions;
import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
@@ -29,40 +32,37 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
- public MultiStatefulDoFnExecutor(
- String stepName, String description,
- JStormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn,
- Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
- TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
- Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
- List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
- super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
- }
+ public MultiStatefulDoFnExecutor(
+ String stepName, String description,
+ JStormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn,
+ Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
+ TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
+ Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
+ List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
+ super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
+ sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
+ }
- @Override
- public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
- if (mainInputTag.equals(tag)) {
- WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
- stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
- executorContext.getExecutorsBolt().timerService()));
- stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
- kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- processMainInput(elem);
- } else {
- processSideInput(tag, elem);
- }
+ @Override
+ public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+ if (mainInputTag.equals(tag)) {
+ WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
+ stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
+ executorContext.getExecutorsBolt().timerService()));
+ stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
+ kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ processMainInput(elem);
+ } else {
+ processSideInput(tag, elem);
}
+ }
- @Override
- public void onTimer(Object key, TimerInternals.TimerData timerData) {
- stepContext.setStateInternals(new JStormStateInternals<>(key,
- kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- super.onTimer(key, timerData);
- }
+ @Override
+ public void onTimer(Object key, TimerInternals.TimerData timerData) {
+ stepContext.setStateInternals(new JStormStateInternals<>(key,
+ kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ super.onTimer(key, timerData);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
index 7d20a4c..ba0c052 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
@@ -17,10 +17,13 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.jstorm.JStormPipelineOptions;
import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
@@ -29,39 +32,35 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
public class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
- public StatefulDoFnExecutor(
- String stepName, String description, JStormPipelineOptions pipelineOptions,
- DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder,
- WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag,
- Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>>
- sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
- super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy,
- mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
- }
+ public StatefulDoFnExecutor(
+ String stepName, String description, JStormPipelineOptions pipelineOptions,
+ DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder,
+ WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag,
+ Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>>
+ sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
+ super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy,
+ mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
+ }
- @Override
- public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
- if (mainInputTag.equals(tag)) {
- WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
- stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
- executorContext.getExecutorsBolt().timerService()));
- stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
- kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- processMainInput(elem);
- } else {
- processSideInput(tag, elem);
- }
+ @Override
+ public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+ if (mainInputTag.equals(tag)) {
+ WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
+ stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
+ executorContext.getExecutorsBolt().timerService()));
+ stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
+ kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ processMainInput(elem);
+ } else {
+ processSideInput(tag, elem);
}
+ }
- @Override
- public void onTimer(Object key, TimerInternals.TimerData timerData) {
- stepContext.setStateInternals(new JStormStateInternals<>(key,
- kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- super.onTimer(key, timerData);
- }
+ @Override
+ public void onTimer(Object key, TimerInternals.TimerData timerData) {
+ stepContext.setStateInternals(new JStormStateInternals<>(key,
+ kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ super.onTimer(key, timerData);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
index 47db018..5c41bda 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,36 +17,35 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime;
-import org.apache.beam.runners.core.TimerInternals;
-import org.joda.time.Instant;
-
import java.io.Serializable;
import java.util.List;
+import org.apache.beam.runners.core.TimerInternals;
+import org.joda.time.Instant;
/**
* Interface that tracks input watermarks and manages timers in each bolt.
*/
public interface TimerService extends Serializable {
- void init(List<Integer> upStreamTasks);
+ void init(List<Integer> upStreamTasks);
- /**
- *
- * @param task
- * @param inputWatermark
- * @return new watermark if any timer is triggered during the update of watermark, otherwise 0
- */
- long updateInputWatermark(Integer task, long inputWatermark);
+ /**
+ *
+ * @param task
+ * @param inputWatermark
+ * @return new watermark if any timer is triggered during the update of watermark, otherwise 0
+ */
+ long updateInputWatermark(Integer task, long inputWatermark);
- long currentInputWatermark();
+ long currentInputWatermark();
- long currentOutputWatermark();
+ long currentOutputWatermark();
- void clearWatermarkHold(String namespace);
+ void clearWatermarkHold(String namespace);
- void addWatermarkHold(String namespace, Instant watermarkHold);
+ void addWatermarkHold(String namespace, Instant watermarkHold);
- void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
+ void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
- void fireTimers(long newWatermark);
+ void fireTimers(long newWatermark);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
index 3b864d5..d2514f1 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,134 +17,139 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
import avro.shaded.com.google.common.collect.Maps;
import avro.shaded.com.google.common.collect.Sets;
import com.alibaba.jstorm.utils.Pair;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
/**
* Default implementation of {@link TimerService}.
*/
public class TimerServiceImpl implements TimerService {
- private transient ExecutorContext executorContext;
- private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
-
- private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark = new ConcurrentHashMap<>();
- private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
- private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
- private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
- private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = new PriorityQueue<>();
- private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
- timerDataToKeyedExecutors = Maps.newHashMap();
-
- private boolean initialized = false;
-
- public TimerServiceImpl() {
- }
-
- public TimerServiceImpl(ExecutorContext executorContext) {
- this.executorContext = executorContext;
- this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor();
- }
-
- @Override
- public void init(List<Integer> upStreamTasks) {
- for (Integer task : upStreamTasks) {
- upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
- inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
- }
- initialized = true;
- }
-
- @Override
- public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) {
- checkState(initialized, "TimerService has not been initialized.");
- Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task);
- // Make sure the input watermark don't go backward.
- if (taskInputWatermark > oldTaskInputWatermark) {
- upStreamTaskToInputWatermark.put(task, taskInputWatermark);
- inputWatermarks.add(taskInputWatermark);
- inputWatermarks.remove(oldTaskInputWatermark);
-
- long newLocalInputWatermark = currentInputWatermark();
- if (newLocalInputWatermark > oldTaskInputWatermark) {
- return newLocalInputWatermark;
- }
- }
- return 0;
+ private transient ExecutorContext executorContext;
+ private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
+
+ private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark =
+ new ConcurrentHashMap<>();
+ private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
+ private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
+ private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
+ private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
+ new PriorityQueue<>();
+ private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
+ timerDataToKeyedExecutors = Maps.newHashMap();
+
+ private boolean initialized = false;
+
+ public TimerServiceImpl() {
+ }
+
+ public TimerServiceImpl(ExecutorContext executorContext) {
+ this.executorContext = executorContext;
+ this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor();
+ }
+
+ @Override
+ public void init(List<Integer> upStreamTasks) {
+ for (Integer task : upStreamTasks) {
+ upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+ inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
}
-
- @Override
- public void fireTimers(long newWatermark) {
- TimerInternals.TimerData timerData;
- while ((timerData = eventTimeTimersQueue.peek()) != null
- && timerData.getTimestamp().getMillis() <= newWatermark) {
- for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) {
- DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
- executor.onTimer(keyedExecutor.getSecond(), timerData);
- }
- eventTimeTimersQueue.remove();
- timerDataToKeyedExecutors.remove(timerData);
- }
+ initialized = true;
+ }
+
+ @Override
+ public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) {
+ checkState(initialized, "TimerService has not been initialized.");
+ Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task);
+ // Make sure the input watermark don't go backward.
+ if (taskInputWatermark > oldTaskInputWatermark) {
+ upStreamTaskToInputWatermark.put(task, taskInputWatermark);
+ inputWatermarks.add(taskInputWatermark);
+ inputWatermarks.remove(oldTaskInputWatermark);
+
+ long newLocalInputWatermark = currentInputWatermark();
+ if (newLocalInputWatermark > oldTaskInputWatermark) {
+ return newLocalInputWatermark;
+ }
}
-
- @Override
- public long currentInputWatermark() {
- return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+ return 0;
+ }
+
+ @Override
+ public void fireTimers(long newWatermark) {
+ TimerInternals.TimerData timerData;
+ while ((timerData = eventTimeTimersQueue.peek()) != null
+ && timerData.getTimestamp().getMillis() <= newWatermark) {
+ for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) {
+ DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
+ executor.onTimer(keyedExecutor.getSecond(), timerData);
+ }
+ eventTimeTimersQueue.remove();
+ timerDataToKeyedExecutors.remove(timerData);
}
-
- @Override
- public long currentOutputWatermark() {
- if (watermarkHolds.isEmpty()) {
- return currentInputWatermark();
- } else {
- return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis());
- }
+ }
+
+ @Override
+ public long currentInputWatermark() {
+ return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+ }
+
+ @Override
+ public long currentOutputWatermark() {
+ if (watermarkHolds.isEmpty()) {
+ return currentInputWatermark();
+ } else {
+ return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis());
}
-
- @Override
- public void clearWatermarkHold(String namespace) {
- Instant currentHold = namespaceToWatermarkHold.get(namespace);
- if (currentHold != null) {
- watermarkHolds.remove(currentHold);
- namespaceToWatermarkHold.remove(namespace);
- }
+ }
+
+ @Override
+ public void clearWatermarkHold(String namespace) {
+ Instant currentHold = namespaceToWatermarkHold.get(namespace);
+ if (currentHold != null) {
+ watermarkHolds.remove(currentHold);
+ namespaceToWatermarkHold.remove(namespace);
}
-
- @Override
- public void addWatermarkHold(String namespace, Instant watermarkHold) {
- Instant currentHold = namespaceToWatermarkHold.get(namespace);
- if (currentHold == null) {
- namespaceToWatermarkHold.put(namespace, watermarkHold);
- watermarkHolds.add(watermarkHold);
- } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
- namespaceToWatermarkHold.put(namespace, watermarkHold);
- watermarkHolds.add(watermarkHold);
- watermarkHolds.remove(currentHold);
- }
+ }
+
+ @Override
+ public void addWatermarkHold(String namespace, Instant watermarkHold) {
+ Instant currentHold = namespaceToWatermarkHold.get(namespace);
+ if (currentHold == null) {
+ namespaceToWatermarkHold.put(namespace, watermarkHold);
+ watermarkHolds.add(watermarkHold);
+ } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
+ namespaceToWatermarkHold.put(namespace, watermarkHold);
+ watermarkHolds.add(watermarkHold);
+ watermarkHolds.remove(currentHold);
}
-
- @Override
- public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) {
- checkArgument(
- TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
- String.format("Does not support domain: %s.", timerData.getDomain()));
- Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData);
- if (keyedExecutors == null) {
- keyedExecutors = Sets.newHashSet();
- eventTimeTimersQueue.add(timerData);
- }
- keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
- timerDataToKeyedExecutors.put(timerData, keyedExecutors);
+ }
+
+ @Override
+ public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) {
+ checkArgument(
+ TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
+ String.format("Does not support domain: %s.", timerData.getDomain()));
+ Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData);
+ if (keyedExecutors == null) {
+ keyedExecutors = Sets.newHashSet();
+ eventTimeTimersQueue.add(timerData);
}
+ keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
+ timerDataToKeyedExecutors.put(timerData, keyedExecutors);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
index 0fb88ab..2bd5f7d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
@@ -24,108 +24,107 @@ import backtype.storm.tuple.Tuple;
import com.alibaba.jstorm.cache.IKvStore;
import com.alibaba.jstorm.cache.IKvStoreManager;
import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
- private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
-
- private static final String TIME_SERVICE_STORE_ID = "timer_service_store";
- private static final String TIMER_SERVICE_KET = "timer_service_key";
-
- private ExecutorsBolt executorsBolt;
- private IKvStoreManager kvStoreManager;
- private IKvStore<String, TimerService> timerServiceStore;
-
- public TxExecutorsBolt(ExecutorsBolt executorsBolt) {
- this.executorsBolt = executorsBolt;
- this.executorsBolt.setStatefulBolt(true);
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- try {
- executorsBolt.prepare(stormConf, context, collector);
- kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager();
- timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
- } catch (IOException e) {
- LOG.error("Failed to prepare stateful bolt", e);
- throw new RuntimeException(e.getMessage());
- }
- }
-
- @Override
- public void execute(Tuple input) {
- executorsBolt.execute(input);
- }
-
- @Override
- public void cleanup() {
- executorsBolt.cleanup();
+ private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
+
+ private static final String TIME_SERVICE_STORE_ID = "timer_service_store";
+ private static final String TIMER_SERVICE_KET = "timer_service_key";
+
+ private ExecutorsBolt executorsBolt;
+ private IKvStoreManager kvStoreManager;
+ private IKvStore<String, TimerService> timerServiceStore;
+
+ public TxExecutorsBolt(ExecutorsBolt executorsBolt) {
+ this.executorsBolt = executorsBolt;
+ this.executorsBolt.setStatefulBolt(true);
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ try {
+ executorsBolt.prepare(stormConf, context, collector);
+ kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager();
+ timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
+ } catch (IOException e) {
+ LOG.error("Failed to prepare stateful bolt", e);
+ throw new RuntimeException(e.getMessage());
}
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- executorsBolt.declareOutputFields(declarer);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return executorsBolt.getComponentConfiguration();
- }
-
- @Override
- public void initState(Object userState) {
- LOG.info("Begin to init from state: {}", userState);
- restore(userState);
- }
-
- @Override
- public Object finishBatch(long batchId) {
- try {
- timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService());
- } catch (IOException e) {
- LOG.error("Failed to store current timer service status", e);
- throw new RuntimeException(e.getMessage());
- }
- kvStoreManager.checkpoint(batchId);
- return null;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ executorsBolt.execute(input);
+ }
+
+ @Override
+ public void cleanup() {
+ executorsBolt.cleanup();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ executorsBolt.declareOutputFields(declarer);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return executorsBolt.getComponentConfiguration();
+ }
+
+ @Override
+ public void initState(Object userState) {
+ LOG.info("Begin to init from state: {}", userState);
+ restore(userState);
+ }
+
+ @Override
+ public Object finishBatch(long batchId) {
+ try {
+ timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService());
+ } catch (IOException e) {
+ LOG.error("Failed to store current timer service status", e);
+ throw new RuntimeException(e.getMessage());
}
-
- @Override
- public Object commit(long batchId, Object state) {
- return kvStoreManager.backup(batchId);
- }
-
- @Override
- public void rollBack(Object userState) {
- LOG.info("Begin to rollback from state: {}", userState);
- restore(userState);
- }
-
- @Override
- public void ackCommit(long batchId, long timeStamp) {
- kvStoreManager.remove(batchId);
- }
-
- private void restore(Object userState) {
- try {
- // restore all states
- kvStoreManager.restore(userState);
-
- // init timer service
- timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
- TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET);
- if (timerService == null) {
- timerService = executorsBolt.initTimerService();
- }
- executorsBolt.setTimerService(timerService);
- } catch (IOException e) {
- LOG.error("Failed to restore state", e);
- throw new RuntimeException(e.getMessage());
- }
+ kvStoreManager.checkpoint(batchId);
+ return null;
+ }
+
+ @Override
+ public Object commit(long batchId, Object state) {
+ return kvStoreManager.backup(batchId);
+ }
+
+ @Override
+ public void rollBack(Object userState) {
+ LOG.info("Begin to rollback from state: {}", userState);
+ restore(userState);
+ }
+
+ @Override
+ public void ackCommit(long batchId, long timeStamp) {
+ kvStoreManager.remove(batchId);
+ }
+
+ private void restore(Object userState) {
+ try {
+ // restore all states
+ kvStoreManager.restore(userState);
+
+ // init timer service
+ timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
+ TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET);
+ if (timerService == null) {
+ timerService = executorsBolt.initTimerService();
+ }
+ executorsBolt.setTimerService(timerService);
+ } catch (IOException e) {
+ LOG.error("Failed to restore state", e);
+ throw new RuntimeException(e.getMessage());
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
index 22dd07b..16f7d99 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
@@ -24,130 +24,130 @@ import com.alibaba.jstorm.cache.IKvStore;
import com.alibaba.jstorm.cache.IKvStoreManager;
import com.alibaba.jstorm.cache.KvStoreManagerFactory;
import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.Map;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.slf4j.LoggerFactory;
public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
- private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
-
- private static final String SOURCE_STORE_ID = "SourceCheckpoint";
- private static final String CHECKPOINT_MARK = "CheckpointMark";
-
- private UnboundedSourceSpout sourceSpout;
- private UnboundedSource.UnboundedReader reader;
- private IKvStoreManager kvStoreManager;
- private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore;
-
- public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) {
- this.sourceSpout = sourceSpout;
- }
-
- private void restore(Object userState) {
- try {
- kvStoreManager.restore(userState);
- sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID);
- UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK);
- sourceSpout.createSourceReader(checkpointMark);
- reader = sourceSpout.getUnboundedSourceReader();
- } catch (IOException e) {
- LOG.error("Failed to init state", e);
- throw new RuntimeException(e.getMessage());
- }
- }
-
- @Override
- public void initState(Object userState) {
- restore(userState);
- }
-
- @Override
- public Object finishBatch(long checkpointId) {
- try {
- // Store check point mark from unbounded source reader
- UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark();
- sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark);
-
- // checkpoint all kv stores in current manager
- kvStoreManager.checkpoint(checkpointId);
- } catch (IOException e) {
- LOG.error(String.format("Failed to finish batch-%s", checkpointId), e);
- throw new RuntimeException(e.getMessage());
- }
- return null;
- }
-
- @Override
- public Object commit(long batchId, Object state) {
- // backup kv stores to remote state backend
- return kvStoreManager.backup(batchId);
- }
-
- @Override
- public void rollBack(Object userState) {
- restore(userState);
- }
-
- @Override
- public void ackCommit(long batchId, long timeStamp) {
- // remove obsolete state in bolt local and remote state backend
- kvStoreManager.remove(batchId);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- sourceSpout.declareOutputFields(declarer);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return sourceSpout.getComponentConfiguration();
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- try {
- sourceSpout.open(conf, context, collector);
- String storeName = String.format("task-%s", context.getThisTaskId());
- String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
- kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, storePath, true);
-
- reader = sourceSpout.getUnboundedSourceReader();
- } catch (IOException e) {
- LOG.error("Failed to open transactional unbounded source spout", e);
- throw new RuntimeException(e.getMessage());
- }
- }
-
- @Override
- public void close() {
- sourceSpout.close();
- }
-
- @Override
- public void activate() {
- sourceSpout.activate();
- }
-
- @Override
- public void deactivate() {
- sourceSpout.deactivate();
- }
-
- @Override
- public void nextTuple() {
- sourceSpout.nextTuple();
- }
-
- @Override
- public void ack(Object msgId) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void fail(Object msgId) {
- throw new UnsupportedOperationException();
- }
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
+
+ private static final String SOURCE_STORE_ID = "SourceCheckpoint";
+ private static final String CHECKPOINT_MARK = "CheckpointMark";
+
+ private UnboundedSourceSpout sourceSpout;
+ private UnboundedSource.UnboundedReader reader;
+ private IKvStoreManager kvStoreManager;
+ private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore;
+
+ public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) {
+ this.sourceSpout = sourceSpout;
+ }
+
+ private void restore(Object userState) {
+ try {
+ kvStoreManager.restore(userState);
+ sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID);
+ UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK);
+ sourceSpout.createSourceReader(checkpointMark);
+ reader = sourceSpout.getUnboundedSourceReader();
+ } catch (IOException e) {
+ LOG.error("Failed to init state", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void initState(Object userState) {
+ restore(userState);
+ }
+
+ @Override
+ public Object finishBatch(long checkpointId) {
+ try {
+ // Store check point mark from unbounded source reader
+ UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark();
+ sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark);
+
+ // checkpoint all kv stores in current manager
+ kvStoreManager.checkpoint(checkpointId);
+ } catch (IOException e) {
+ LOG.error(String.format("Failed to finish batch-%s", checkpointId), e);
+ throw new RuntimeException(e.getMessage());
+ }
+ return null;
+ }
+
+ @Override
+ public Object commit(long batchId, Object state) {
+ // backup kv stores to remote state backend
+ return kvStoreManager.backup(batchId);
+ }
+
+ @Override
+ public void rollBack(Object userState) {
+ restore(userState);
+ }
+
+ @Override
+ public void ackCommit(long batchId, long timeStamp) {
+ // remove obsolete state in bolt local and remote state backend
+ kvStoreManager.remove(batchId);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ sourceSpout.declareOutputFields(declarer);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return sourceSpout.getComponentConfiguration();
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ try {
+ sourceSpout.open(conf, context, collector);
+ String storeName = String.format("task-%s", context.getThisTaskId());
+ String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
+ kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(
+ context, storeName, storePath, true);
+
+ reader = sourceSpout.getUnboundedSourceReader();
+ } catch (IOException e) {
+ LOG.error("Failed to open transactional unbounded source spout", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() {
+ sourceSpout.close();
+ }
+
+ @Override
+ public void activate() {
+ sourceSpout.activate();
+ }
+
+ @Override
+ public void deactivate() {
+ sourceSpout.deactivate();
+ }
+
+ @Override
+ public void nextTuple() {
+ sourceSpout.nextTuple();
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ throw new UnsupportedOperationException();
+ }
}
\ No newline at end of file