You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:24 UTC
[24/53] [abbrv] beam git commit: jstorm-runner: move most classes to
translation package and reduece their visibility to package private.
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
new file mode 100644
index 0000000..fce870f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.SetState;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateBinder;
+import org.apache.beam.sdk.state.StateContext;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+
+/**
+ * JStorm implementation of {@link StateInternals}.
+ */
+class JStormStateInternals<K> implements StateInternals {
+
+ private static final String STATE_INFO = "state-info:";
+
+ @Nullable
+ private final K key;
+ private final IKvStoreManager kvStoreManager;
+ private final TimerService timerService;
+ private final int executorId;
+
+ public JStormStateInternals(K key, IKvStoreManager kvStoreManager,
+ TimerService timerService, int executorId) {
+ this.key = key;
+ this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager");
+ this.timerService = checkNotNull(timerService, "timerService");
+ this.executorId = executorId;
+ }
+
+ @Nullable
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public <T extends State> T state(
+ StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
+ // throw new UnsupportedOperationException("StateContext is not supported.");
+ /**
+ * TODOļ¼
+ * Same implementation as state() which is without StateContext. This might be updated after
+ * we figure out if we really need StateContext for JStorm state internals.
+ */
+ return state(namespace, address);
+ }
+
+ @Override
+ public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
+ return address.getSpec().bind(address.getId(), new StateBinder() {
+ @Override
+ public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
+ try {
+ return new JStormValueState<>(
+ getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ @Override
+ public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
+ try {
+ return new JStormBagState(
+ getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
+ kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ @Override
+ public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+ String id,
+ StateSpec<MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder) {
+ try {
+ return new JStormMapState<>(
+ getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> CombiningState bindCombining(
+ String id,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ try {
+ BagState<AccumT> accumBagState = new JStormBagState(
+ getKey(), namespace,
+ kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
+ kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+ return new JStormCombiningState<>(accumBagState, combineFn);
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+
+
+ @Override
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
+ bindCombiningWithContext(
+ String id,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public WatermarkHoldState bindWatermark(
+ String id,
+ StateSpec<WatermarkHoldState> spec,
+ final TimestampCombiner timestampCombiner) {
+ try {
+ BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
+ getKey(), namespace,
+ kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
+ kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+
+ Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
+ new BinaryCombineFn<Instant>() {
+ @Override
+ public Instant apply(Instant left, Instant right) {
+ return timestampCombiner.combine(left, right);
+ }
+ };
+ return new JStormWatermarkHoldState(
+ namespace,
+ new JStormCombiningState<>(
+ accumBagState,
+ outputTimeCombineFn),
+ timestampCombiner,
+ timerService);
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+ });
+ }
+
+ private String getStoreId(String stateId) {
+ return String.format("%s-%s", stateId, executorId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
new file mode 100644
index 0000000..4c96541
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.joda.time.Instant;
+
+/**
+ * JStorm implementation of {@link TimerInternals}.
+ */
+class JStormTimerInternals<K> implements TimerInternals {
+
+ private final K key;
+ private final DoFnExecutor<?, ?> doFnExecutor;
+ private final TimerService timerService;
+
+ public JStormTimerInternals(
+ @Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) {
+ this.key = key;
+ this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor");
+ this.timerService = checkNotNull(timerService, "timerService");
+ }
+
+ @Override
+ public void setTimer(
+ StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
+ setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+ }
+
+ @Override
+ @Deprecated
+ public void setTimer(TimerData timerData) {
+ timerService.setTimer(key, timerData, doFnExecutor);
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer is not yet supported.");
+ }
+
+ @Override
+ @Deprecated
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer is not yet supported.");
+ }
+
+ @Override
+ @Deprecated
+ public void deleteTimer(TimerData timerData) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer is not yet supported.");
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return Instant.now();
+ }
+
+ @Override
+ @Nullable
+ public Instant currentSynchronizedProcessingTime() {
+ return null;
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return new Instant(timerService.currentInputWatermark());
+ }
+
+ @Override
+ @Nullable
+ public Instant currentOutputWatermarkTime() {
+ return new Instant(timerService.currentOutputWatermark());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java
new file mode 100644
index 0000000..5d79d21
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStore;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.ValueState;
+
+/**
+ * JStorm implementation of {@link ValueState}.
+ */
+class JStormValueState<K, T> implements ValueState<T> {
+
+ @Nullable
+ private final K key;
+ private final StateNamespace namespace;
+ private final IKvStore<ComposedKey, T> kvState;
+
+ JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
+ this.key = key;
+ this.namespace = namespace;
+ this.kvState = kvState;
+ }
+
+ @Override
+ public void write(T t) {
+ try {
+ kvState.put(getComposedKey(), t);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
+ }
+ }
+
+ @Override
+ public T read() {
+ try {
+ return kvState.get(getComposedKey());
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Failed to read key: %s, namespace: %s.", key, namespace));
+ }
+ }
+
+ @Override
+ public ValueState<T> readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ try {
+ kvState.remove(getComposedKey());
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Failed to clear key: %s, namespace: %s.", key, namespace));
+ }
+ }
+
+ private ComposedKey getComposedKey() {
+ return ComposedKey.of(key, namespace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java
new file mode 100644
index 0000000..7e1c28f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.GroupingState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+
+/**
+ * JStorm implementation of {@link WatermarkHoldState}.
+ */
+class JStormWatermarkHoldState implements WatermarkHoldState {
+
+ private final StateNamespace namespace;
+ private final GroupingState<Instant, Instant> watermarkHoldsState;
+ private final TimestampCombiner timestampCombiner;
+ private final TimerService timerService;
+
+ JStormWatermarkHoldState(
+ StateNamespace namespace,
+ GroupingState<Instant, Instant> watermarkHoldsState,
+ TimestampCombiner timestampCombiner,
+ TimerService timerService) {
+ this.namespace = checkNotNull(namespace, "namespace");
+ this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
+ this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
+ this.timerService = checkNotNull(timerService, "timerService");
+ }
+
+ @Override
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
+ }
+
+ @Override
+ public void add(Instant instant) {
+ timerService.addWatermarkHold(namespace.stringKey(), instant);
+ watermarkHoldsState.add(instant);
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return watermarkHoldsState.isEmpty();
+ }
+
+ @Override
+ public Instant read() {
+ return watermarkHoldsState.read();
+ }
+
+ @Override
+ public WatermarkHoldState readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ timerService.clearWatermarkHold(namespace.stringKey());
+ watermarkHoldsState.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
new file mode 100644
index 0000000..82d8bdc
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
+import com.alibaba.jstorm.common.metric.AsmCounter;
+import com.alibaba.jstorm.metric.MetricClient;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+
+/**
+ * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to JStorm engine.
+ */
+class MetricsReporter {
+
+ private static final String METRIC_KEY_SEPARATOR = "__";
+ private static final String COUNTER_PREFIX = "__counter";
+
+ private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
+ private final Map<String, Long> reportedCounters = Maps.newHashMap();
+ private final MetricClient metricClient;
+
+ public static MetricsReporter create(MetricClient metricClient) {
+ return new MetricsReporter(metricClient);
+ }
+
+ private MetricsReporter(MetricClient metricClient) {
+ this.metricClient = checkNotNull(metricClient, "metricClient");
+ }
+
+ public MetricsContainer getMetricsContainer(String stepName) {
+ return metricsContainers.getContainer(stepName);
+ }
+
+ public void updateMetrics() {
+ MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
+ MetricQueryResults metricQueryResults =
+ metricResults.queryMetrics(MetricsFilter.builder().build());
+ updateCounters(metricQueryResults.counters());
+ }
+
+ private void updateCounters(Iterable<MetricResult<Long>> counters) {
+ System.out.print("updateCounters");
+ for (MetricResult<Long> metricResult : counters) {
+ String metricName = getMetricNameString(COUNTER_PREFIX, metricResult);
+ System.out.print("metricName: " + metricName);
+ Long updateValue = metricResult.attempted();
+ Long oldValue = reportedCounters.get(metricName);
+
+ if (oldValue == null || oldValue < updateValue) {
+ AsmCounter counter = metricClient.registerCounter(metricName);
+ Long incValue = (oldValue == null ? updateValue : updateValue - oldValue);
+ counter.update(incValue);
+ }
+ }
+ }
+
+ private String getMetricNameString(String prefix, MetricResult<?> metricResult) {
+ return prefix
+ + METRIC_KEY_SEPARATOR + metricResult.step()
+ + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
+ + METRIC_KEY_SEPARATOR + metricResult.name().name();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
new file mode 100644
index 0000000..49b0f85
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JStorm {@link Executor} for {@link DoFn} with multi-output.
+ * @param <InputT>
+ * @param <OutputT>
+ */
+class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
+ private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
+
+ /**
+ * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated
+ * tag is used in downstream consumer. So before output, we need to map this "local" tag to
+ * "external" tag. See PCollectionTuple for details.
+ */
+ public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ if (localTupleTagMap.containsKey(tag)) {
+ executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
+ } else {
+ executorsBolt.processExecutorElem(tag, output);
+ }
+ }
+ }
+
+ protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
+
+ public MultiOutputDoFnExecutor(
+ String stepName,
+ String description,
+ JStormPipelineOptions pipelineOptions,
+ DoFn<InputT, OutputT> doFn,
+ Coder<WindowedValue<InputT>> inputCoder,
+ WindowingStrategy<?, ?> windowingStrategy,
+ TupleTag<InputT> mainInputTag,
+ Collection<PCollectionView<?>> sideInputs,
+ Map<TupleTag, PCollectionView<?>> sideInputTagToView,
+ TupleTag<OutputT> mainTupleTag,
+ List<TupleTag<?>> sideOutputTags,
+ Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
+ ) {
+ super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
+ sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
+ this.localTupleTagMap = localTupleTagMap;
+ this.outputManager = new MultiOutputDoFnExecutorOutputManager();
+ LOG.info("localTupleTagMap: {}", localTupleTagMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
new file mode 100644
index 0000000..a3ffc30
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * JStorm {@link Executor} for stateful {@link DoFn} with multi-output.
+ * @param <OutputT>
+ */
+class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
+
+ public MultiStatefulDoFnExecutor(
+ String stepName, String description,
+ JStormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn,
+ Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
+ TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
+ Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
+ List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
+ super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
+ sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
+ }
+
+ @Override
+ public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+ if (mainInputTag.equals(tag)) {
+ WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
+ stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
+ executorContext.getExecutorsBolt().timerService()));
+ stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
+ kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ processMainInput(elem);
+ } else {
+ processSideInput(tag, elem);
+ }
+ }
+
+ @Override
+ public void onTimer(Object key, TimerInternals.TimerData timerData) {
+ stepContext.setStateInternals(new JStormStateInternals<>(key,
+ kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ super.onTimer(key, timerData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
new file mode 100644
index 0000000..7daa1cb
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}.
+ */
+class ParDoBoundMultiTranslator<InputT, OutputT>
+ extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
+
+ @Override
+ public void translateNode(
+ ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
+ final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag();
+ PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
+
+ Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
+ Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
+ for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
+ Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
+ localToExternalTupleTagMap.put(entry.getKey(), itr.next());
+ }
+
+ TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
+ List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
+ sideOutputTags.remove(mainOutputTag);
+
+ Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
+ for (PCollectionView pCollectionView : transform.getSideInputs()) {
+ allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+ }
+ String description = describeTransform(
+ transform,
+ allInputs,
+ allOutputs);
+
+ ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
+ for (PCollectionView pCollectionView : transform.getSideInputs()) {
+ sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+ }
+
+ DoFnExecutor executor;
+ DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+ if (signature.stateDeclarations().size() > 0
+ || signature.timerDeclarations().size() > 0) {
+ executor = new MultiStatefulDoFnExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ userGraphContext.getOptions(),
+ (DoFn<KV, OutputT>) transform.getFn(),
+ (Coder) WindowedValue.getFullCoder(
+ input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+ input.getWindowingStrategy(),
+ (TupleTag<KV>) inputTag,
+ transform.getSideInputs(),
+ sideInputTagToView.build(),
+ mainOutputTag,
+ sideOutputTags,
+ localToExternalTupleTagMap);
+ } else {
+ executor = new MultiOutputDoFnExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ userGraphContext.getOptions(),
+ transform.getFn(),
+ WindowedValue.getFullCoder(
+ input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+ input.getWindowingStrategy(),
+ inputTag,
+ transform.getSideInputs(),
+ sideInputTagToView.build(),
+ mainOutputTag,
+ sideOutputTags,
+ localToExternalTupleTagMap);
+ }
+
+ context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
new file mode 100644
index 0000000..6feb7f8
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Translates a ParDo.Bound to a JStorm {@link DoFnExecutor}.
+ */
+class ParDoBoundTranslator<InputT, OutputT>
+ extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
+
+ @Override
+ public void translateNode(
+ ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
+ final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ final TupleTag<?> inputTag = userGraphContext.getInputTag();
+ PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
+
+ TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
+ List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
+
+ Map<TupleTag<?>, PValue> allInputs =
+ avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
+ for (PCollectionView pCollectionView : transform.getSideInputs()) {
+ allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+ }
+ String description = describeTransform(
+ transform,
+ allInputs,
+ userGraphContext.getOutputs());
+
+ ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
+ for (PCollectionView pCollectionView : transform.getSideInputs()) {
+ sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+ }
+
+ DoFnExecutor executor;
+ DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+ if (signature.stateDeclarations().size() > 0
+ || signature.timerDeclarations().size() > 0) {
+ executor = new StatefulDoFnExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ userGraphContext.getOptions(),
+ (DoFn<KV, OutputT>) transform.getFn(),
+ (Coder) WindowedValue.getFullCoder(
+ input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+ input.getWindowingStrategy(),
+ (TupleTag<KV>) inputTag,
+ transform.getSideInputs(),
+ sideInputTagToView.build(),
+ mainOutputTag,
+ sideOutputTags);
+ } else {
+ executor = new DoFnExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ userGraphContext.getOptions(),
+ transform.getFn(),
+ WindowedValue.getFullCoder(
+ input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+ input.getWindowingStrategy(),
+ (TupleTag<InputT>) inputTag,
+ transform.getSideInputs(),
+ sideInputTagToView.build(),
+ mainOutputTag,
+ sideOutputTags);
+ }
+
+ context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java
new file mode 100644
index 0000000..4f469f3
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Utils for JStorm runner.
+ */
+class RunnerUtils {
+ /**
+ * Convert {@link WindowedValue} into {@link KeyedWorkItem}.
+ * @param elem
+ * @return
+ */
+ public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) {
+ WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem;
+ SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of(
+ kvElem.getValue().getKey(),
+ kvElem.withValue(kvElem.getValue().getValue()));
+ return workItem;
+ }
+
+ public static boolean isGroupByKeyExecutor(Executor executor) {
+ if (executor instanceof GroupByWindowExecutor) {
+ return true;
+ } else if (executor instanceof StatefulDoFnExecutor
+ || executor instanceof MultiStatefulDoFnExecutor) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SerializedPipelineOptions.java
new file mode 100644
index 0000000..14d2972
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SerializedPipelineOptions.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+ private final byte[] serializedOptions;
+
+ /**
+ * Lazily initialized copy of deserialized options.
+ */
+ private transient PipelineOptions pipelineOptions;
+
+ public SerializedPipelineOptions(PipelineOptions options) {
+ checkNotNull(options, "PipelineOptions must not be null.");
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ new ObjectMapper().writeValue(baos, options);
+ this.serializedOptions = baos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+ }
+
+ }
+
+ public PipelineOptions getPipelineOptions() {
+ if (pipelineOptions == null) {
+ try {
+ pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+ }
+ }
+
+ return pipelineOptions;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java
new file mode 100644
index 0000000..b321c76
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.util.Collections;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Singleton keyed word item.
+ * @param <K>
+ * @param <ElemT>
+ */
+class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
+
+ final K key;
+ final WindowedValue<ElemT> value;
+
+ private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) {
+ return new SingletonKeyedWorkItem<K, ElemT>(key, value);
+ }
+
+ @Override
+ public K key() {
+ return key;
+ }
+
+ public WindowedValue<ElemT> value() {
+ return value;
+ }
+
+ @Override
+ public Iterable<TimerInternals.TimerData> timersIterable() {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public Iterable<WindowedValue<ElemT>> elementsIterable() {
+ return Collections.singletonList(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
new file mode 100644
index 0000000..911f259
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * JStorm {@link Executor} for stateful {@link DoFn}.
+ * @param <OutputT>
+ */
+class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
+ public StatefulDoFnExecutor(
+ String stepName, String description, JStormPipelineOptions pipelineOptions,
+ DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder,
+ WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag,
+ Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>>
+ sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
+ super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy,
+ mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
+ }
+
+ @Override
+ public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+ if (mainInputTag.equals(tag)) {
+ WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
+ stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
+ executorContext.getExecutorsBolt().timerService()));
+ stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
+ kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ processMainInput(elem);
+ } else {
+ processSideInput(tag, elem);
+ }
+ }
+
+ @Override
+ public void onTimer(Object key, TimerInternals.TimerData timerData) {
+ stepContext.setStateInternals(new JStormStateInternals<>(key,
+ kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ super.onTimer(key, timerData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Stream.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Stream.java
new file mode 100644
index 0000000..30ff18c
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Stream.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Class that defines the stream connection between upstream and downstream components.
+ */
+@AutoValue
+public abstract class Stream {
+
+ public abstract Producer getProducer();
+
+ public abstract Consumer getConsumer();
+
+ public static Stream of(Producer producer, Consumer consumer) {
+ return new AutoValue_Stream(
+ producer, consumer);
+ }
+
+ /**
+ * JStorm producer.
+ */
+ @AutoValue
+ public abstract static class Producer {
+ public abstract String getComponentId();
+
+ public abstract String getStreamId();
+
+ public abstract String getStreamName();
+
+ public static Producer of(String componentId, String streamId, String streamName) {
+ return new AutoValue_Stream_Producer(
+ componentId, streamId, streamName);
+ }
+ }
+
+ /**
+ * JStorm consumer.
+ */
+ @AutoValue
+ public abstract static class Consumer {
+ public abstract String getComponentId();
+
+ public abstract Grouping getGrouping();
+
+ public static Consumer of(String componentId, Grouping grouping) {
+ return new AutoValue_Stream_Consumer(
+ componentId, grouping);
+ }
+ }
+
+ /**
+ * JStorm grouping, which define how to transfer message between two nodes.
+ */
+ @AutoValue
+ public abstract static class Grouping {
+ public abstract Type getType();
+
+ @Nullable
+ public abstract List<String> getFields();
+
+ public static Grouping of(Type type) {
+ checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields.");
+ return new AutoValue_Stream_Grouping(
+ type, null /* fields */);
+ }
+
+ public static Grouping byFields(List<String> fields) {
+ checkNotNull(fields, "fields");
+ checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!");
+ return new AutoValue_Stream_Grouping(
+ Type.FIELDS, fields);
+ }
+
+ /**
+ * Types of stream groupings Storm allows.
+ */
+ public enum Type {
+ ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
new file mode 100644
index 0000000..29345aa
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.runners.core.TimerInternals;
+import org.joda.time.Instant;
+
+/**
+ * Interface that tracks input watermarks and manages timers in each bolt.
+ */
+interface TimerService extends Serializable {
+
+ void init(List<Integer> upStreamTasks);
+
+ /**
+ *
+ * @param task
+ * @param inputWatermark
+ * @return new watermark if any timer is triggered during the update of watermark, otherwise 0
+ */
+ long updateInputWatermark(Integer task, long inputWatermark);
+
+ long currentInputWatermark();
+
+ long currentOutputWatermark();
+
+ void clearWatermarkHold(String namespace);
+
+ void addWatermarkHold(String namespace, Instant watermarkHold);
+
+ void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
+
+ void fireTimers(long newWatermark);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
new file mode 100644
index 0000000..c2600e5
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.alibaba.jstorm.utils.Pair;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Instant;
+
+/**
+ * Default implementation of {@link TimerService}.
+ */
+class TimerServiceImpl implements TimerService {
+ private transient ExecutorContext executorContext;
+ private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
+
+ private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark =
+ new ConcurrentHashMap<>();
+ private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
+ private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
+ private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
+ private final transient PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
+ new PriorityQueue<>();
+ private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
+ timerDataToKeyedExecutors = Maps.newHashMap();
+
+ private boolean initialized = false;
+
+ public TimerServiceImpl() {
+ }
+
+ public TimerServiceImpl(ExecutorContext executorContext) {
+ this.executorContext = executorContext;
+ this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor();
+ }
+
+ @Override
+ public void init(List<Integer> upStreamTasks) {
+ for (Integer task : upStreamTasks) {
+ upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+ inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+ }
+ initialized = true;
+ }
+
+ @Override
+ public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) {
+ checkState(initialized, "TimerService has not been initialized.");
+ Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task);
+ // Make sure the input watermark don't go backward.
+ if (taskInputWatermark > oldTaskInputWatermark) {
+ upStreamTaskToInputWatermark.put(task, taskInputWatermark);
+ inputWatermarks.add(taskInputWatermark);
+ inputWatermarks.remove(oldTaskInputWatermark);
+
+ long newLocalInputWatermark = currentInputWatermark();
+ if (newLocalInputWatermark > oldTaskInputWatermark) {
+ return newLocalInputWatermark;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public void fireTimers(long newWatermark) {
+ TimerInternals.TimerData timerData;
+ while ((timerData = eventTimeTimersQueue.peek()) != null
+ && timerData.getTimestamp().getMillis() <= newWatermark) {
+ for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) {
+ DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
+ executor.onTimer(keyedExecutor.getSecond(), timerData);
+ }
+ eventTimeTimersQueue.remove();
+ timerDataToKeyedExecutors.remove(timerData);
+ }
+ }
+
+ @Override
+ public long currentInputWatermark() {
+ return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+ }
+
+ @Override
+ public long currentOutputWatermark() {
+ if (watermarkHolds.isEmpty()) {
+ return currentInputWatermark();
+ } else {
+ return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis());
+ }
+ }
+
+ @Override
+ public void clearWatermarkHold(String namespace) {
+ Instant currentHold = namespaceToWatermarkHold.get(namespace);
+ if (currentHold != null) {
+ watermarkHolds.remove(currentHold);
+ namespaceToWatermarkHold.remove(namespace);
+ }
+ }
+
+ @Override
+ public void addWatermarkHold(String namespace, Instant watermarkHold) {
+ Instant currentHold = namespaceToWatermarkHold.get(namespace);
+ if (currentHold == null) {
+ namespaceToWatermarkHold.put(namespace, watermarkHold);
+ watermarkHolds.add(watermarkHold);
+ } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
+ namespaceToWatermarkHold.put(namespace, watermarkHold);
+ watermarkHolds.add(watermarkHold);
+ watermarkHolds.remove(currentHold);
+ }
+ }
+
+ @Override
+ public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) {
+ checkArgument(
+ TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
+ String.format("Does not support domain: %s.", timerData.getDomain()));
+ Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData);
+ if (keyedExecutors == null) {
+ keyedExecutors = Sets.newHashSet();
+ eventTimeTimersQueue.add(timerData);
+ }
+ keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
+ timerDataToKeyedExecutors.put(timerData, keyedExecutors);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
new file mode 100644
index 0000000..edd3d8a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
+ */
+interface TransformTranslator<T extends PTransform<?, ?>> {
+
+ void translateNode(T transform, TranslationContext context);
+
+ /**
+ * Returns true if this translator can translate the given transform.
+ */
+ boolean canTranslate(T transform, TranslationContext context);
+
+ /**
+ * Default translator.
+ * @param <T1>
+ */
+ class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
+ @Override
+ public void translateNode(T1 transform, TranslationContext context) {
+
+ }
+
+ @Override
+ public boolean canTranslate(T1 transform, TranslationContext context) {
+ return true;
+ }
+
+ static String describeTransform(
+ PTransform<?, ?> transform,
+ Map<TupleTag<?>, PValue> inputs,
+ Map<TupleTag<?>, PValue> outputs) {
+ return String.format("%s --> %s --> %s",
+ Joiner.on('+').join(FluentIterable.from(inputs.entrySet())
+ .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
+ @Override
+ public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
+ return taggedPValue.getKey().getId();
+ // return taggedPValue.getValue().getName();
+ }
+ })),
+ transform.getName(),
+ Joiner.on('+').join(FluentIterable.from(outputs.entrySet())
+ .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
+ @Override
+ public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
+ return taggedPvalue.getKey().getId();
+ //return taggedPValue.getValue().getName();
+ }
+ })));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index 28d102d..b84fd4a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -34,12 +34,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.Executor;
-import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
-import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
-import org.apache.beam.runners.jstorm.translation.translator.Stream;
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-import org.apache.beam.runners.jstorm.util.RunnerUtils;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.PValueBase;
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
index 316186e..9eaa13a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
@@ -19,15 +19,6 @@ package org.apache.beam.runners.jstorm.translation;
import java.util.HashMap;
import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.translator.BoundedSourceTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.FlattenTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.GroupByKeyTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.ParDoBoundMultiTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.ParDoBoundTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.UnboundedSourceTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.WindowAssignTranslator;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -40,7 +31,7 @@ import org.slf4j.LoggerFactory;
/**
* Lookup table mapping PTransform types to associated TransformTranslator implementations.
*/
-public class TranslatorRegistry {
+class TranslatorRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class);
private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS =
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxExecutorsBolt.java
new file mode 100644
index 0000000..2159cfa
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxExecutorsBolt.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import com.alibaba.jstorm.cache.IKvStore;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor;
+import java.io.IOException;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Transactional executors bolt handles the checkpoint and restore of state and timer.
+ */
+public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
+
+ private static final String TIME_SERVICE_STORE_ID = "timer_service_store";
+ private static final String TIMER_SERVICE_KET = "timer_service_key";
+
+ private ExecutorsBolt executorsBolt;
+ private IKvStoreManager kvStoreManager;
+ private IKvStore<String, TimerService> timerServiceStore;
+
+ public TxExecutorsBolt(ExecutorsBolt executorsBolt) {
+ this.executorsBolt = executorsBolt;
+ this.executorsBolt.setStatefulBolt(true);
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ try {
+ executorsBolt.prepare(stormConf, context, collector);
+ kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager();
+ timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
+ } catch (IOException e) {
+ LOG.error("Failed to prepare stateful bolt", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ executorsBolt.execute(input);
+ }
+
+ @Override
+ public void cleanup() {
+ executorsBolt.cleanup();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ executorsBolt.declareOutputFields(declarer);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return executorsBolt.getComponentConfiguration();
+ }
+
+ @Override
+ public void initState(Object userState) {
+ LOG.info("Begin to init from state: {}", userState);
+ restore(userState);
+ }
+
+ @Override
+ public Object finishBatch(long batchId) {
+ try {
+ timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService());
+ } catch (IOException e) {
+ LOG.error("Failed to store current timer service status", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ kvStoreManager.checkpoint(batchId);
+ return null;
+ }
+
+ @Override
+ public Object commit(long batchId, Object state) {
+ return kvStoreManager.backup(batchId);
+ }
+
+ @Override
+ public void rollBack(Object userState) {
+ LOG.info("Begin to rollback from state: {}", userState);
+ restore(userState);
+ }
+
+ @Override
+ public void ackCommit(long batchId, long timeStamp) {
+ kvStoreManager.remove(batchId);
+ }
+
+ private void restore(Object userState) {
+ try {
+ // restore all states
+ kvStoreManager.restore(userState);
+
+ // init timer service
+ timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
+ TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET);
+ if (timerService == null) {
+ timerService = executorsBolt.initTimerService();
+ }
+ executorsBolt.setTimerService(timerService);
+ } catch (IOException e) {
+ LOG.error("Failed to restore state", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxUnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxUnboundedSourceSpout.java
new file mode 100644
index 0000000..382cb50
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxUnboundedSourceSpout.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import com.alibaba.jstorm.cache.IKvStore;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.KvStoreManagerFactory;
+import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Transactional unbounded source spout handles the checkpoint and restore of state and timer.
+ */
+public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
+
+ private static final String SOURCE_STORE_ID = "SourceCheckpoint";
+ private static final String CHECKPOINT_MARK = "CheckpointMark";
+
+ private UnboundedSourceSpout sourceSpout;
+ private UnboundedSource.UnboundedReader reader;
+ private IKvStoreManager kvStoreManager;
+ private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore;
+
+ public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) {
+ this.sourceSpout = sourceSpout;
+ }
+
+ private void restore(Object userState) {
+ try {
+ kvStoreManager.restore(userState);
+ sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID);
+ UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK);
+ sourceSpout.createSourceReader(checkpointMark);
+ reader = sourceSpout.getUnboundedSourceReader();
+ } catch (IOException e) {
+ LOG.error("Failed to init state", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void initState(Object userState) {
+ restore(userState);
+ }
+
+ @Override
+ public Object finishBatch(long checkpointId) {
+ try {
+ // Store check point mark from unbounded source reader
+ UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark();
+ sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark);
+
+ // checkpoint all kv stores in current manager
+ kvStoreManager.checkpoint(checkpointId);
+ } catch (IOException e) {
+ LOG.error(String.format("Failed to finish batch-%s", checkpointId), e);
+ throw new RuntimeException(e.getMessage());
+ }
+ return null;
+ }
+
+ @Override
+ public Object commit(long batchId, Object state) {
+ // backup kv stores to remote state backend
+ return kvStoreManager.backup(batchId);
+ }
+
+ @Override
+ public void rollBack(Object userState) {
+ restore(userState);
+ }
+
+ @Override
+ public void ackCommit(long batchId, long timeStamp) {
+ // remove obsolete state in bolt local and remote state backend
+ kvStoreManager.remove(batchId);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ sourceSpout.declareOutputFields(declarer);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return sourceSpout.getComponentConfiguration();
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ try {
+ sourceSpout.open(conf, context, collector);
+ String storeName = String.format("task-%s", context.getThisTaskId());
+ String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
+ kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(
+ context, storeName, storePath, true);
+
+ reader = sourceSpout.getUnboundedSourceReader();
+ } catch (IOException e) {
+ LOG.error("Failed to open transactional unbounded source spout", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() {
+ sourceSpout.close();
+ }
+
+ @Override
+ public void activate() {
+ sourceSpout.activate();
+ }
+
+ @Override
+ public void deactivate() {
+ sourceSpout.deactivate();
+ }
+
+ @Override
+ public void nextTuple() {
+ sourceSpout.nextTuple();
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ throw new UnsupportedOperationException();
+ }
+}