You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:03 UTC
[03/53] [abbrv] beam git commit: jstorm-runner: rename the package to
org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
new file mode 100644
index 0000000..f101beb
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import com.alibaba.jstorm.cache.IKvStore;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class JStormMapState<K, V> implements MapState<K, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
+
+ private final K key;
+ private final StateNamespace namespace;
+ private IKvStore<K, V> kvStore;
+
+ public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
+ this.key = key;
+ this.namespace = namespace;
+ this.kvStore = kvStore;
+ }
+
+ @Override
+ public void put(K var1, V var2) {
+ try {
+ kvStore.put(var1, var2);
+ } catch (IOException e) {
+ reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
+ }
+ }
+
+ @Override
+ public ReadableState<V> putIfAbsent(K var1, V var2) {
+ ReadableState<V> ret = null;
+ try {
+ V value = kvStore.get(var1);
+ if (value == null) {
+ kvStore.put(var1, var2);
+ ret = new MapReadableState<>(null);
+ } else {
+ ret = new MapReadableState<>(value);
+ }
+ } catch (IOException e) {
+ reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void remove(K var1) {
+ try {
+ kvStore.remove(var1);
+ } catch (IOException e) {
+ reportError(String.format("Failed to remove key=%s", var1), e);
+ }
+ }
+
+ @Override
+ public ReadableState<V> get(K var1) {
+ ReadableState<V> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState(kvStore.get(var1));
+ } catch (IOException e) {
+ reportError(String.format("Failed to get value for key=%s", var1), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public ReadableState<Iterable<K>> keys() {
+ ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState<>(kvStore.keys());
+ } catch (IOException e) {
+ reportError(String.format("Failed to get keys"), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public ReadableState<Iterable<V>> values() {
+ ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState<>(kvStore.values());
+ } catch (IOException e) {
+ reportError(String.format("Failed to get values"), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
+ ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState<>(kvStore.entries());
+ } catch (IOException e) {
+ reportError(String.format("Failed to get values"), e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void clear() {
+ try {
+ Iterable<K> keys = kvStore.keys();
+ kvStore.removeBatch(keys);
+ } catch (IOException e) {
+ reportError(String.format("Failed to clear map state"), e);
+ }
+ }
+
+ private void reportError(String errorInfo, IOException e) {
+ LOG.error(errorInfo, e);
+ throw new RuntimeException(errorInfo);
+ }
+
+ private class MapReadableState<T> implements ReadableState<T> {
+ private T value;
+
+ public MapReadableState(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public T read() {
+ return value;
+ }
+
+ @Override
+ public ReadableState<T> readLater() {
+ return this;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
new file mode 100644
index 0000000..8a0cb73
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.SetState;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateBinder;
+import org.apache.beam.sdk.state.StateContext;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * JStorm implementation of {@link StateInternals}.
+ */
+public class JStormStateInternals<K> implements StateInternals {
+
+ private static final String STATE_INFO = "state-info:";
+
+ @Nullable
+ private final K key;
+ private final IKvStoreManager kvStoreManager;
+ private final TimerService timerService;
+ private final int executorId;
+
+ public JStormStateInternals(K key, IKvStoreManager kvStoreManager,
+ TimerService timerService, int executorId) {
+ this.key = key;
+ this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager");
+ this.timerService = checkNotNull(timerService, "timerService");
+ this.executorId = executorId;
+ }
+
+ @Nullable
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public <T extends State> T state(
+ StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
+ // throw new UnsupportedOperationException("StateContext is not supported.");
+ /**
+ * TODOļ¼
+ * Same implementation as state() which is without StateContext. This might be updated after
+ * we figure out if we really need StateContext for JStorm state internals.
+ */
+ return state(namespace, address);
+ }
+
+ @Override
+ public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
+ return address.getSpec().bind(address.getId(), new StateBinder() {
+ @Override
+ public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
+ try {
+ return new JStormValueState<>(
+ getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ @Override
+ public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
+ try {
+ return new JStormBagState(
+ getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
+ kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ @Override
+ public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+ String id,
+ StateSpec<MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder) {
+ try {
+ return new JStormMapState<>(getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> CombiningState bindCombining(
+ String id,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ try {
+ BagState<AccumT> accumBagState = new JStormBagState(
+ getKey(), namespace,
+ kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
+ kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+ return new JStormCombiningState<>(accumBagState, combineFn);
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+
+
+ @Override
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
+ bindCombiningWithContext(
+ String id,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public WatermarkHoldState bindWatermark(
+ String id,
+ StateSpec<WatermarkHoldState> spec,
+ final TimestampCombiner timestampCombiner) {
+ try {
+ BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
+ getKey(), namespace,
+ kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
+ kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+
+ Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
+ new BinaryCombineFn<Instant>() {
+ @Override
+ public Instant apply(Instant left, Instant right) {
+ return timestampCombiner.combine(left, right);
+ }};
+ return new JStormWatermarkHoldState(
+ namespace,
+ new JStormCombiningState<>(
+ accumBagState,
+ outputTimeCombineFn),
+ timestampCombiner,
+ timerService);
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+ });
+ }
+
+ private String getStoreId(String stateId) {
+ return String.format("%s-%s", stateId, executorId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
new file mode 100644
index 0000000..5ad3663
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStore;
+
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.ValueState;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+/**
+ * JStorm implementation of {@link ValueState}.
+ */
+public class JStormValueState<K, T> implements ValueState<T> {
+
+ @Nullable
+ private final K key;
+ private final StateNamespace namespace;
+ private final IKvStore<ComposedKey, T> kvState;
+
+ JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
+ this.key = key;
+ this.namespace = namespace;
+ this.kvState = kvState;
+ }
+
+ @Override
+ public void write(T t) {
+ try {
+ kvState.put(getComposedKey(), t);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
+ }
+ }
+
+ @Override
+ public T read() {
+ try {
+ return kvState.get(getComposedKey());
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Failed to read key: %s, namespace: %s.", key, namespace));
+ }
+ }
+
+ @Override
+ public ValueState<T> readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ try {
+ kvState.remove(getComposedKey());
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Failed to clear key: %s, namespace: %s.", key, namespace));
+ }
+ }
+
+ private ComposedKey getComposedKey() {
+ return ComposedKey.of(key, namespace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
new file mode 100644
index 0000000..659d77c
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.GroupingState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+
+/**
+ * JStorm implementation of {@link WatermarkHoldState}.
+ */
+public class JStormWatermarkHoldState implements WatermarkHoldState {
+
+ private final StateNamespace namespace;
+ private final GroupingState<Instant, Instant> watermarkHoldsState;
+ private final TimestampCombiner timestampCombiner;
+ private final TimerService timerService;
+
+ JStormWatermarkHoldState(
+ StateNamespace namespace,
+ GroupingState<Instant, Instant> watermarkHoldsState,
+ TimestampCombiner timestampCombiner,
+ TimerService timerService) {
+ this.namespace = checkNotNull(namespace, "namespace");
+ this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
+ this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
+ this.timerService = checkNotNull(timerService, "timerService");
+ }
+
+ @Override
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
+ }
+
+ @Override
+ public void add(Instant instant) {
+ timerService.addWatermarkHold(namespace.stringKey(), instant);
+ watermarkHoldsState.add(instant);
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return watermarkHoldsState.isEmpty();
+ }
+
+ @Override
+ public Instant read() {
+ return watermarkHoldsState.read();
+ }
+
+ @Override
+ public WatermarkHoldState readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ timerService.clearWatermarkHold(namespace.stringKey());
+ watermarkHoldsState.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
new file mode 100644
index 0000000..4b5f83c
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.timer;
+
+import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
+import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * JStorm implementation of {@link TimerInternals}.
+ */
+public class JStormTimerInternals<K> implements TimerInternals {
+
+ private final K key;
+ private final DoFnExecutor<?, ?> doFnExecutor;
+ private final TimerService timerService;
+
+
+ public JStormTimerInternals(@Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) {
+ this.key = key;
+ this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor");
+ this.timerService = checkNotNull(timerService, "timerService");
+ }
+
+ @Override
+ public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
+ setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+ }
+
+ @Override
+ @Deprecated
+ public void setTimer(TimerData timerData) {
+ timerService.setTimer(key, timerData, doFnExecutor);
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer is not yet supported.");
+ }
+
+ @Override
+ @Deprecated
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer is not yet supported.");
+ }
+
+ @Override
+ @Deprecated
+ public void deleteTimer(TimerData timerData) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer is not yet supported.");
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return Instant.now();
+ }
+
+ @Override
+ @Nullable
+ public Instant currentSynchronizedProcessingTime() {
+ return null;
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return new Instant(timerService.currentInputWatermark());
+ }
+
+ @Override
+ @Nullable
+ public Instant currentOutputWatermarkTime() {
+ return new Instant(timerService.currentOutputWatermark());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
new file mode 100644
index 0000000..9651fc2
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
+
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Translates a {@link Read.Bounded} into a Storm spout.
+ *
+ * @param <T>
+ */
+public class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
+
+ @Override
+ public void translateNode(Read.Bounded<T> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+
+ TupleTag<?> outputTag = userGraphContext.getOutputTag();
+ PValue outputValue = userGraphContext.getOutput();
+ UnboundedSourceSpout spout = new UnboundedSourceSpout(
+ description,
+ new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
+ userGraphContext.getOptions(), outputTag);
+
+ context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(outputTag, outputValue));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
new file mode 100644
index 0000000..c4da58a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.sdk.transforms.Combine;
+
+public class CombineGloballyTranslator<InputT, OutputT> extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
new file mode 100644
index 0000000..99cbff7
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.sdk.transforms.Combine;
+
+public class CombinePerKeyTranslator<K, InputT, OutputT> extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
new file mode 100644
index 0000000..4558216
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import com.google.common.collect.Maps;
+import org.apache.beam.sdk.transforms.Flatten;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.FlattenExecutor;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.Map;
+
+public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> {
+
+ @Override
+ public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+ // Since a new tag is created in PCollectionList, retrieve the real tag here.
+ Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
+ for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
+ PCollection<V> pc = (PCollection<V>) entry.getValue();
+ inputs.putAll(pc.expand());
+ }
+ System.out.println("Real inputs: " + inputs);
+ System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
+ String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
+ FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
+ context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
new file mode 100644
index 0000000..6b8297b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
+import com.google.common.collect.Lists;
+import org.apache.beam.sdk.transforms.GroupByKey;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.Collections;
+import java.util.List;
+
+public class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
+ // information of transform
+ protected PCollection<KV<K, V>> input;
+ protected PCollection<KV<K, Iterable<V>>> output;
+ protected List<TupleTag<?>> inputTags;
+ protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
+ protected List<TupleTag<?>> sideOutputTags;
+ protected List<PCollectionView<?>> sideInputs;
+ protected WindowingStrategy<?, ?> windowingStrategy;
+
+ @Override
+ public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+
+ input = (PCollection<KV<K, V>>) userGraphContext.getInput();
+ output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
+
+ inputTags = userGraphContext.getInputTags();
+ mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
+ sideOutputTags = Lists.newArrayList();
+
+ sideInputs = Collections.<PCollectionView<?>>emptyList();
+ windowingStrategy = input.getWindowingStrategy();
+
+ GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ context,
+ context.getUserGraphContext().getOptions(),
+ windowingStrategy,
+ mainOutputTag,
+ sideOutputTags);
+ context.addTransformExecutor(groupByWindowExecutor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
new file mode 100644
index 0000000..c487578
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import avro.shaded.com.google.common.collect.Maps;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
+import org.apache.beam.runners.jstorm.translation.runtime.MultiOutputDoFnExecutor;
+import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}.
+ */
+public class ParDoBoundMultiTranslator<InputT, OutputT>
+ extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
+
+ @Override
+ public void translateNode(ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
+ final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag();
+ PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
+
+ Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
+ Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
+ for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
+ Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
+ localToExternalTupleTagMap.put(entry.getKey(), itr.next());
+ }
+
+ TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
+ List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
+ sideOutputTags.remove(mainOutputTag);
+
+ Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
+ for (PCollectionView pCollectionView : transform.getSideInputs()) {
+ allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+ }
+ String description = describeTransform(
+ transform,
+ allInputs,
+ allOutputs);
+
+ ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
+ for (PCollectionView pCollectionView : transform.getSideInputs()) {
+ sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+ }
+
+ DoFnExecutor executor;
+ DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+ if (signature.stateDeclarations().size() > 0
+ || signature.timerDeclarations().size() > 0) {
+ executor = new MultiStatefulDoFnExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ userGraphContext.getOptions(),
+ (DoFn<KV, OutputT>) transform.getFn(),
+ (Coder) WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+ input.getWindowingStrategy(),
+ (TupleTag<KV>) inputTag,
+ transform.getSideInputs(),
+ sideInputTagToView.build(),
+ mainOutputTag,
+ sideOutputTags,
+ localToExternalTupleTagMap);
+ } else {
+ executor = new MultiOutputDoFnExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ userGraphContext.getOptions(),
+ transform.getFn(),
+ WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+ input.getWindowingStrategy(),
+ inputTag,
+ transform.getSideInputs(),
+ sideInputTagToView.build(),
+ mainOutputTag,
+ sideOutputTags,
+ localToExternalTupleTagMap);
+ }
+
+ context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
new file mode 100644
index 0000000..3a952a9
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import java.util.List;
+import java.util.Map;
+
+import avro.shaded.com.google.common.collect.Lists;
+import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Translates a ParDo.Bound to a Storm {@link DoFnExecutor}.
+ */
+public class ParDoBoundTranslator<InputT, OutputT>
+ extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
+
+ @Override
+ public void translateNode(ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
+ final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ final TupleTag<?> inputTag = userGraphContext.getInputTag();
+ PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
+
+ TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
+ List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
+
+ Map<TupleTag<?>, PValue> allInputs = avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
+ for (PCollectionView pCollectionView : transform.getSideInputs()) {
+ allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+ }
+ String description = describeTransform(
+ transform,
+ allInputs,
+ userGraphContext.getOutputs());
+
+ ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
+ for (PCollectionView pCollectionView : transform.getSideInputs()) {
+ sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+ }
+
+ DoFnExecutor executor;
+ DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+ if (signature.stateDeclarations().size() > 0
+ || signature.timerDeclarations().size() > 0) {
+ executor = new StatefulDoFnExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ userGraphContext.getOptions(),
+ (DoFn<KV, OutputT>) transform.getFn(),
+ (Coder) WindowedValue.getFullCoder(
+ input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+ input.getWindowingStrategy(),
+ (TupleTag<KV>) inputTag,
+ transform.getSideInputs(),
+ sideInputTagToView.build(),
+ mainOutputTag,
+ sideOutputTags);
+ } else {
+ executor = new DoFnExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ userGraphContext.getOptions(),
+ transform.getFn(),
+ WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+ input.getWindowingStrategy(),
+ (TupleTag<InputT>) inputTag,
+ transform.getSideInputs(),
+ sideInputTagToView.build(),
+ mainOutputTag,
+ sideOutputTags);
+ }
+
+ context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
new file mode 100644
index 0000000..1ef1ec3
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.sdk.transforms.Reshuffle;
+
+public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K,V>> {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
new file mode 100644
index 0000000..9f69391
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import com.google.auto.value.AutoValue;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Class that defines the stream connection between upstream and downstream components.
+ */
+@AutoValue
+public abstract class Stream {
+
+ public abstract Producer getProducer();
+ public abstract Consumer getConsumer();
+
+ public static Stream of(Producer producer, Consumer consumer) {
+ return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream(producer, consumer);
+ }
+
+ @AutoValue
+ public abstract static class Producer {
+ public abstract String getComponentId();
+ public abstract String getStreamId();
+ public abstract String getStreamName();
+
+ public static Producer of(String componentId, String streamId, String streamName) {
+ return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Producer(
+ componentId, streamId, streamName);
+ }
+ }
+
+ @AutoValue
+ public abstract static class Consumer {
+ public abstract String getComponentId();
+ public abstract Grouping getGrouping();
+
+ public static Consumer of(String componentId, Grouping grouping) {
+ return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Consumer(
+ componentId, grouping);
+ }
+ }
+
+ @AutoValue
+ public abstract static class Grouping {
+ public abstract Type getType();
+
+ @Nullable
+ public abstract List<String> getFields();
+
+ public static Grouping of(Type type) {
+ checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields.");
+ return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Grouping(
+ type, null /* fields */);
+ }
+
+ public static Grouping byFields(List<String> fields) {
+ checkNotNull(fields, "fields");
+ checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!");
+ return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Grouping(
+ Type.FIELDS, fields);
+ }
+
+ /**
+ * Types of stream groupings Storm allows
+ */
+ public enum Type {
+ ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
new file mode 100644
index 0000000..bebdf7b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.Map;
+
+/**
+ * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
+ */
+public interface TransformTranslator<T extends PTransform<?, ?>> {
+
+ void translateNode(T transform, TranslationContext context);
+
+ /**
+ * Returns true if this translator can translate the given transform.
+ */
+ boolean canTranslate(T transform, TranslationContext context);
+
+ class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
+ @Override
+ public void translateNode(T1 transform, TranslationContext context) {
+
+ }
+
+ @Override
+ public boolean canTranslate(T1 transform, TranslationContext context) {
+ return true;
+ }
+
+ static String describeTransform(
+ PTransform<?, ?> transform,
+ Map<TupleTag<?>, PValue> inputs,
+ Map<TupleTag<?>, PValue> outputs) {
+ return String.format("%s --> %s --> %s",
+ Joiner.on('+').join(FluentIterable.from(inputs.entrySet())
+ .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
+ @Override
+ public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
+ return taggedPValue.getKey().getId();
+ // return taggedPValue.getValue().getName();
+ }})),
+ transform.getName(),
+ Joiner.on('+').join(FluentIterable.from(outputs.entrySet())
+ .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
+ @Override
+ public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
+ return taggedPvalue.getKey().getId();
+ //return taggedPValue.getValue().getName();
+ }})));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
new file mode 100644
index 0000000..ac7d7bd
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
+
+/**
+ * Translates a Read.Unbounded into a Storm spout.
+ *
+ * @param <T>
+ */
+public class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> {
+ public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+
+ TupleTag<?> tag = userGraphContext.getOutputTag();
+ PValue output = userGraphContext.getOutput();
+
+ UnboundedSourceSpout spout = new UnboundedSourceSpout(
+ description,
+ transform.getSource(), userGraphContext.getOptions(), tag);
+ context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
new file mode 100644
index 0000000..0ebf837
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.ViewExecutor;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner.
+ */
+public class ViewTranslator extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> {
+ @Override
+ public void translateNode(CreateJStormPCollectionView<?, ?> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+ ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag());
+ context.addTransformExecutor(viewExecutor);
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+ * for the Flink runner in streaming mode.
+ */
+ public static class ViewAsMap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+ @SuppressWarnings("unused") // used via reflection in JstormRunner#apply()
+ public ViewAsMap(View.AsMap<K, V> transform) {
+ }
+
+ @Override
+ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, V>> view =
+ PCollectionViews.mapView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ // TODO: log warning as other runners.
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMap";
+ }
+ }
+
+ /**
+ * Specialized expansion for {@link
+ * View.AsMultimap View.AsMultimap} for the
+ * Flink runner in streaming mode.
+ */
+ public static class ViewAsMultimap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+ public ViewAsMultimap(View.AsMultimap<K, V> transform) {
+ }
+
+ @Override
+ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, Iterable<V>>> view =
+ PCollectionViews.multimapView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ // TODO: log warning as other runners.
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMultimap";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link View.AsList View.AsList} for the
+ * JStorm runner in streaming mode.
+ */
+ public static class ViewAsList<T>
+ extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+ public ViewAsList(View.AsList<T> transform) {}
+
+ @Override
+ public PCollectionView<List<T>> expand(PCollection<T> input) {
+ PCollectionView<List<T>> view =
+ PCollectionViews.listView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateJStormPCollectionView.<T, List<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsList";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link View.AsIterable View.AsIterable} for the
+ * JStorm runner in streaming mode.
+ */
+ public static class ViewAsIterable<T>
+ extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+ public ViewAsIterable(View.AsIterable<T> transform) { }
+
+ @Override
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+ PCollectionView<Iterable<T>> view =
+ PCollectionViews.iterableView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsIterable";
+ }
+ }
+
+ /**
+ * Specialized expansion for
+ * {@link View.AsSingleton View.AsSingleton} for the
+ * JStorm runner in streaming mode.
+ */
+ public static class ViewAsSingleton<T>
+ extends PTransform<PCollection<T>, PCollectionView<T>> {
+ private View.AsSingleton<T> transform;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+ public ViewAsSingleton(View.AsSingleton<T> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<T> expand(PCollection<T> input) {
+ Combine.Globally<T, T> combine = Combine.globally(
+ new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+ if (!transform.hasDefaultValue()) {
+ combine = combine.withoutDefaults();
+ }
+ return input.apply(combine.asSingletonView());
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsSingleton";
+ }
+
+ private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+ private boolean hasDefaultValue;
+ private T defaultValue;
+
+ SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+ this.hasDefaultValue = hasDefaultValue;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public T apply(T left, T right) {
+ throw new IllegalArgumentException("PCollection with more than one element "
+ + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ + "combine the PCollection into a single value");
+ }
+
+ @Override
+ public T identity() {
+ if (hasDefaultValue) {
+ return defaultValue;
+ } else {
+ throw new IllegalArgumentException(
+ "Empty PCollection accessed as a singleton view. "
+ + "Consider setting withDefault to provide a default value");
+ }
+ }
+ }
+ }
+
+ public static class CombineGloballyAsSingletonView<InputT, OutputT>
+ extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public CombineGloballyAsSingletonView(
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+ PCollection<OutputT> combined =
+ input.apply(Combine.globally(transform.getCombineFn())
+ .withoutDefaults()
+ .withFanout(transform.getFanout()));
+
+ PCollectionView<OutputT> view = PCollectionViews.singletonView(
+ combined,
+ combined.getWindowingStrategy(),
+ transform.getInsertDefault(),
+ transform.getInsertDefault()
+ ? transform.getCombineFn().defaultValue() : null,
+ combined.getCoder());
+ return combined
+ .apply(ParDo.of(new WrapAsList<OutputT>()))
+ .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingCombineGloballyAsSingletonView";
+ }
+ }
+
+ private static class WrapAsList<T> extends DoFn<T, List<T>> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(Collections.singletonList(c.element()));
+ }
+ }
+
+ /**
+ * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+ * They require the input {@link PCollection} fits in memory.
+ * For a large {@link PCollection} this is expected to crash!
+ *
+ * @param <T> the type of elements to concatenate.
+ */
+ private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> result = createAccumulator();
+ for (List<T> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
+ }
+
+ @Override
+ public List<T> extractOutput(List<T> accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+
+ @Override
+ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+ }
+
+ /**
+ * Creates a primitive {@link PCollectionView}.
+ *
+ * <p>For internal use only by runner implementors.
+ *
+ * @param <ElemT> The type of the elements of the input PCollection
+ * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+ */
+ public static class CreateJStormPCollectionView<ElemT, ViewT>
+ extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+ private PCollectionView<ViewT> view;
+
+ private CreateJStormPCollectionView(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
+
+ public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of(
+ PCollectionView<ViewT> view) {
+ return new CreateJStormPCollectionView<>(view);
+ }
+
+ @Override
+ public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+ return view;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
new file mode 100644
index 0000000..0bf9a49
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.translation.runtime.WindowAssignExecutor;
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+
+public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
+
+ @Override
+ public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+ context.getUserGraphContext().setWindowed();
+ WindowAssignExecutor executor = new WindowAssignExecutor(
+ description,
+ transform.getWindowFn(),
+ userGraphContext.getOutputTag());
+ context.addTransformExecutor(executor);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
new file mode 100644
index 0000000..b67aff9
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Translates a Window.Bound node into a Storm WindowedBolt
+ *
+ * @param <T>
+ */
+public class WindowBoundTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class);
+
+ // Do nothing here currently. The assign of window strategy is included in AssignTranslator.
+ @Override
+ public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+ if (transform.getWindowFn() instanceof FixedWindows) {
+ context.getUserGraphContext().setWindowed();
+ } else if (transform.getWindowFn() instanceof SlidingWindows) {
+ context.getUserGraphContext().setWindowed();
+ } else {
+ throw new UnsupportedOperationException("Not supported window type currently: " + transform.getWindowFn());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
new file mode 100644
index 0000000..07a3ad5
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.util;
+
+public class CommonInstance {
+ public static final String KEY = "Key";
+ public static final String VALUE = "Value";
+
+ public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
new file mode 100644
index 0000000..87562fd
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
+/**
+ * No-op SideInputReader implementation.
+ */
+public class DefaultSideInputReader implements SideInputReader, Serializable {
+ @Nullable
+ @Override
+ public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
+ return null;
+ }
+
+ @Override
+ public <T> boolean contains(PCollectionView<T> pCollectionView) {
+ return false;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return true;
+ }
+}