You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:12 UTC
[12/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
index 2d80617..7f98c61 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
@@ -17,177 +17,175 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+import com.alibaba.jstorm.utils.KryoSerializer;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.jstorm.JStormPipelineOptions;
import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-import com.alibaba.jstorm.utils.KryoSerializer;
+import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
-
-import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Values;
-
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* Spout implementation that wraps a Beam UnboundedSource
- *
+ * <p>
* TODO: add wrapper to support metrics in UnboundedSource.
*/
public class UnboundedSourceSpout extends AdaptorBasicSpout {
- private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
-
- private final String description;
- private final UnboundedSource source;
- private final SerializedPipelineOptions serializedOptions;
- private final TupleTag<?> outputTag;
-
- private transient JStormPipelineOptions pipelineOptions;
- private transient UnboundedSource.UnboundedReader reader;
- private transient SpoutOutputCollector collector;
-
- private volatile boolean hasNextRecord;
- private AtomicBoolean activated = new AtomicBoolean();
-
- private KryoSerializer<WindowedValue> serializer;
-
- private long lastWaterMark = 0l;
-
- public UnboundedSourceSpout(
- String description,
- UnboundedSource source,
- JStormPipelineOptions options,
- TupleTag<?> outputTag) {
- this.description = checkNotNull(description, "description");
- this.source = checkNotNull(source, "source");
- this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
- this.outputTag = checkNotNull(outputTag, "outputTag");
- }
-
- @Override
- public synchronized void close() {
- try {
- activated.set(false);
- this.reader.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void activate() {
- activated.set(true);
-
- }
-
- @Override
- public void deactivate() {
- activated.set(false);
- }
-
- @Override
- public void ack(Object msgId) {
- throw new UnsupportedOperationException();
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
+
+ private final String description;
+ private final UnboundedSource source;
+ private final SerializedPipelineOptions serializedOptions;
+ private final TupleTag<?> outputTag;
+
+ private transient JStormPipelineOptions pipelineOptions;
+ private transient UnboundedSource.UnboundedReader reader;
+ private transient SpoutOutputCollector collector;
+
+ private volatile boolean hasNextRecord;
+ private AtomicBoolean activated = new AtomicBoolean();
+
+ private KryoSerializer<WindowedValue> serializer;
+
+ private long lastWaterMark = 0l;
+
+ public UnboundedSourceSpout(
+ String description,
+ UnboundedSource source,
+ JStormPipelineOptions options,
+ TupleTag<?> outputTag) {
+ this.description = checkNotNull(description, "description");
+ this.source = checkNotNull(source, "source");
+ this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
+ this.outputTag = checkNotNull(outputTag, "outputTag");
+ }
+
+ @Override
+ public synchronized void close() {
+ try {
+ activated.set(false);
+ this.reader.close();
+ } catch (IOException e) {
+ e.printStackTrace();
}
-
- @Override
- public void fail(Object msgId) {
- throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void activate() {
+ activated.set(true);
+
+ }
+
+ @Override
+ public void deactivate() {
+ activated.set(false);
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ try {
+ this.collector = collector;
+ this.pipelineOptions =
+ this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
+
+ createSourceReader(null);
+
+ this.serializer = new KryoSerializer<>(conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create unbounded reader.", e);
}
+ }
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
+ public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
+ if (reader != null) {
+ reader.close();
}
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- try {
- this.collector = collector;
- this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
-
- createSourceReader(null);
-
- this.serializer = new KryoSerializer<>(conf);
- } catch (IOException e) {
- throw new RuntimeException("Unable to create unbounded reader.", e);
- }
+ reader = this.source.createReader(this.pipelineOptions, checkpointMark);
+ hasNextRecord = this.reader.start();
+ }
+
+ @Override
+ public synchronized void nextTuple() {
+ if (!activated.get()) {
+ return;
}
-
- public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
- if (reader != null) {
- reader.close();
+ try {
+ if (!hasNextRecord) {
+ hasNextRecord = reader.advance();
+ }
+
+ while (hasNextRecord && activated.get()) {
+ Object value = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ WindowedValue wv =
+ WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ LOG.debug("Source output: " + wv.getValue());
+ if (keyedEmit(outputTag.getId())) {
+ KV kv = (KV) wv.getValue();
+ // Convert WindowedValue<KV> to <K, WindowedValue<V>>
+ byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue()));
+ collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue));
+ } else {
+ byte[] immutableValue = serializer.serialize(wv);
+ collector.emit(outputTag.getId(), new Values(immutableValue));
}
- reader = this.source.createReader(this.pipelineOptions, checkpointMark);
- hasNextRecord = this.reader.start();
- }
- @Override
- public synchronized void nextTuple() {
- if (!activated.get()) {
- return;
- }
- try {
- if (!hasNextRecord) {
- hasNextRecord = reader.advance();
- }
-
- while (hasNextRecord && activated.get()) {
- Object value = reader.getCurrent();
- Instant timestamp = reader.getCurrentTimestamp();
-
- WindowedValue wv = WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
- LOG.debug("Source output: " + wv.getValue());
- if (keyedEmit(outputTag.getId())) {
- KV kv = (KV) wv.getValue();
- // Convert WindowedValue<KV> to <K, WindowedValue<V>>
- byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue()));
- collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue));
- } else {
- byte[] immutableValue = serializer.serialize(wv);
- collector.emit(outputTag.getId(), new Values(immutableValue));
- }
-
- // move to next record
- hasNextRecord = reader.advance();
- }
-
- Instant waterMark = reader.getWatermark();
- if (waterMark != null && lastWaterMark < waterMark.getMillis()) {
- lastWaterMark = waterMark.getMillis();
- collector.flush();
- collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis()));
- LOG.debug("Source output: WM-{}", waterMark.toDateTime());
- }
- } catch (IOException e) {
- throw new RuntimeException("Exception reading values from source.", e);
- }
+ // move to next record
+ hasNextRecord = reader.advance();
+ }
+
+ Instant waterMark = reader.getWatermark();
+ if (waterMark != null && lastWaterMark < waterMark.getMillis()) {
+ lastWaterMark = waterMark.getMillis();
+ collector.flush();
+ collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis()));
+ LOG.debug("Source output: WM-{}", waterMark.toDateTime());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Exception reading values from source.", e);
}
+ }
- public UnboundedSource getUnboundedSource() {
- return source;
- }
+ public UnboundedSource getUnboundedSource() {
+ return source;
+ }
- public UnboundedSource.UnboundedReader getUnboundedSourceReader() {
- return reader;
- }
+ public UnboundedSource.UnboundedReader getUnboundedSourceReader() {
+ return reader;
+ }
- @Override
- public String toString() {
- return description;
- }
+ @Override
+ public String toString() {
+ return description;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
index 7b0e8db..4320967 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
@@ -26,30 +26,31 @@ import org.apache.beam.sdk.values.TupleTag;
*/
public class ViewExecutor implements Executor {
- private final String description;
- private final TupleTag outputTag;
- private ExecutorsBolt executorsBolt;
-
- public ViewExecutor(String description, TupleTag outputTag) {
- this.description = description;
- this.outputTag = outputTag;
- }
-
- @Override
- public void init(ExecutorContext context) {
- this.executorsBolt = context.getExecutorsBolt();
- }
-
- @Override
- public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
- executorsBolt.processExecutorElem(outputTag, elem);
- }
-
- @Override
- public void cleanup() {}
-
- @Override
- public String toString() {
- return description;
- }
+ private final String description;
+ private final TupleTag outputTag;
+ private ExecutorsBolt executorsBolt;
+
+ public ViewExecutor(String description, TupleTag outputTag) {
+ this.description = description;
+ this.outputTag = outputTag;
+ }
+
+ @Override
+ public void init(ExecutorContext context) {
+ this.executorsBolt = context.getExecutorsBolt();
+ }
+
+ @Override
+ public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+ executorsBolt.processExecutorElem(outputTag, elem);
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
index a6c3c16..7f21d26 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
@@ -17,7 +17,10 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.google.common.collect.Iterables;
+import java.util.Collection;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
@@ -26,82 +29,79 @@ import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
- private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
-
- private final String description;
- private WindowFn<T, W> windowFn;
- private ExecutorsBolt executorsBolt;
- private TupleTag outputTag;
-
- class JStormAssignContext<InputT, W extends BoundedWindow>
- extends WindowFn<InputT, W>.AssignContext {
- private final WindowedValue<InputT> value;
-
- JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
- fn.super();
- checkArgument(
- Iterables.size(value.getWindows()) == 1,
- String.format(
- "%s passed to window assignment must be in a single window, but it was in %s: %s",
- WindowedValue.class.getSimpleName(),
- Iterables.size(value.getWindows()),
- value.getWindows()));
- this.value = value;
- }
-
- @Override
- public InputT element() {
- return value.getValue();
- }
-
- @Override
- public Instant timestamp() {
- return value.getTimestamp();
- }
-
- @Override
- public BoundedWindow window() {
- return Iterables.getOnlyElement(value.getWindows());
- }
+ private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
+
+ private final String description;
+ private WindowFn<T, W> windowFn;
+ private ExecutorsBolt executorsBolt;
+ private TupleTag outputTag;
+
+ class JStormAssignContext<InputT, W extends BoundedWindow>
+ extends WindowFn<InputT, W>.AssignContext {
+ private final WindowedValue<InputT> value;
+
+ JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+ fn.super();
+ checkArgument(
+ Iterables.size(value.getWindows()) == 1,
+ String.format(
+ "%s passed to window assignment must be in a single window, but it was in %s: %s",
+ WindowedValue.class.getSimpleName(),
+ Iterables.size(value.getWindows()),
+ value.getWindows()));
+ this.value = value;
}
- public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) {
- this.description = description;
- this.windowFn = windowFn;
- this.outputTag = outputTag;
+ @Override
+ public InputT element() {
+ return value.getValue();
}
@Override
- public void init(ExecutorContext context) {
- this.executorsBolt = context.getExecutorsBolt();
+ public Instant timestamp() {
+ return value.getTimestamp();
}
@Override
- public void process(TupleTag tag, WindowedValue elem) {
- Collection<W> windows = null;
- try {
- windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem));
- for (W window: windows) {
- executorsBolt.processExecutorElem(
- outputTag,
- WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
- }
- } catch (Exception e) {
- LOG.warn("Failed to assign windows for elem=" + elem, e);
- }
+ public BoundedWindow window() {
+ return Iterables.getOnlyElement(value.getWindows());
}
+ }
+
+ public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) {
+ this.description = description;
+ this.windowFn = windowFn;
+ this.outputTag = outputTag;
+ }
+
+ @Override
+ public void init(ExecutorContext context) {
+ this.executorsBolt = context.getExecutorsBolt();
+ }
+
+ @Override
+ public void process(TupleTag tag, WindowedValue elem) {
+ Collection<W> windows = null;
+ try {
+ windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem));
+ for (W window : windows) {
+ executorsBolt.processExecutorElem(
+ outputTag,
+ WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to assign windows for elem=" + elem, e);
+ }
+ }
- @Override
- public void cleanup() {}
+ @Override
+ public void cleanup() {
+ }
- @Override
- public String toString() {
- return description;
- }
+ @Override
+ public String toString() {
+ return description;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
index eaf0549..1466f35 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,162 +17,161 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime.state;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import com.alibaba.jstorm.cache.ComposedKey;
import com.alibaba.jstorm.cache.IKvStore;
import com.alibaba.jstorm.cache.KvStoreIterable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.ReadableState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* JStorm implementation of {@link BagState}.
*/
class JStormBagState<K, T> implements BagState<T> {
- private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
-
- @Nullable
- private final K key;
- private final StateNamespace namespace;
- private final IKvStore<ComposedKey, T> kvState;
- private final IKvStore<ComposedKey, Object> stateInfoKvState;
- private int elemIndex;
-
- public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
- IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
- this.key = key;
- this.namespace = checkNotNull(namespace, "namespace");
- this.kvState = checkNotNull(kvState, "kvState");
- this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
-
- Integer index = (Integer) stateInfoKvState.get(getComposedKey());
- this.elemIndex = index != null ? ++index : 0;
+ private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
+
+ @Nullable
+ private final K key;
+ private final StateNamespace namespace;
+ private final IKvStore<ComposedKey, T> kvState;
+ private final IKvStore<ComposedKey, Object> stateInfoKvState;
+ private int elemIndex;
+
+ public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
+ IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
+ this.key = key;
+ this.namespace = checkNotNull(namespace, "namespace");
+ this.kvState = checkNotNull(kvState, "kvState");
+ this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
+
+ Integer index = (Integer) stateInfoKvState.get(getComposedKey());
+ this.elemIndex = index != null ? ++index : 0;
+ }
+
+ @Override
+ public void add(T input) {
+ try {
+ kvState.put(getComposedKey(elemIndex), input);
+ stateInfoKvState.put(getComposedKey(), elemIndex);
+ elemIndex++;
+ } catch (IOException e) {
+ throw new RuntimeException(e.getCause());
}
-
- @Override
- public void add(T input) {
- try {
- kvState.put(getComposedKey(elemIndex), input);
- stateInfoKvState.put(getComposedKey(), elemIndex);
- elemIndex++;
- } catch (IOException e) {
- throw new RuntimeException(e.getCause());
- }
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public Boolean read() {
+ return elemIndex <= 0;
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public Iterable<T> read() {
+ return new BagStateIterable(elemIndex);
+ }
+
+ @Override
+ public BagState readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ try {
+ for (int i = 0; i < elemIndex; i++) {
+ kvState.remove(getComposedKey(i));
+ }
+ stateInfoKvState.remove(getComposedKey());
+ elemIndex = 0;
+ } catch (IOException e) {
+ throw new RuntimeException(e.getCause());
}
+ }
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- return elemIndex <= 0;
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- // TODO: support prefetch.
- return this;
- }
- };
- }
+ private ComposedKey getComposedKey() {
+ return ComposedKey.of(key, namespace);
+ }
- @Override
- public Iterable<T> read() {
- return new BagStateIterable(elemIndex);
- }
+ private ComposedKey getComposedKey(int elemIndex) {
+ return ComposedKey.of(key, namespace, elemIndex);
+ }
- @Override
- public BagState readLater() {
- // TODO: support prefetch.
- return this;
- }
+ private class BagStateIterable implements KvStoreIterable<T> {
- @Override
- public void clear() {
+ private class BagStateIterator implements Iterator<T> {
+ private final int size;
+ private int cursor = 0;
+
+ BagStateIterator() {
+ Integer s = null;
try {
- for (int i = 0; i < elemIndex; i++) {
- kvState.remove(getComposedKey(i));
- }
- stateInfoKvState.remove(getComposedKey());
- elemIndex = 0;
+ s = (Integer) stateInfoKvState.get(getComposedKey());
} catch (IOException e) {
- throw new RuntimeException(e.getCause());
+ LOG.error("Failed to get elemIndex for key={}", getComposedKey());
+ }
+ this.size = s != null ? ++s : 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return cursor < size;
+ }
+
+ @Override
+ public T next() {
+ if (cursor >= size) {
+ throw new NoSuchElementException();
}
- }
-
- private ComposedKey getComposedKey() {
- return ComposedKey.of(key, namespace);
- }
-
- private ComposedKey getComposedKey(int elemIndex) {
- return ComposedKey.of(key, namespace, elemIndex);
- }
- private class BagStateIterable implements KvStoreIterable<T> {
-
- private class BagStateIterator implements Iterator<T> {
- private final int size;
- private int cursor = 0;
-
- BagStateIterator() {
- Integer s = null;
- try {
- s = (Integer) stateInfoKvState.get(getComposedKey());
- } catch (IOException e) {
- LOG.error("Failed to get elemIndex for key={}", getComposedKey());
- }
- this.size = s != null ? ++s : 0;
- }
-
- @Override
- public boolean hasNext() {
- return cursor < size;
- }
-
- @Override
- public T next() {
- if (cursor >= size) {
- throw new NoSuchElementException();
- }
-
- T value = null;
- try {
- value = kvState.get(getComposedKey(cursor));
- } catch (IOException e) {
- LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
- }
- cursor++;
- return value;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
+ T value = null;
+ try {
+ value = kvState.get(getComposedKey(cursor));
+ } catch (IOException e) {
+ LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
}
+ cursor++;
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
- private final int size;
+ private final int size;
- BagStateIterable(int size) {
- this.size = size;
- }
+ BagStateIterable(int size) {
+ this.size = size;
+ }
- @Override
- public Iterator<T> iterator() {
- return new BagStateIterator();
- }
+ @Override
+ public Iterator<T> iterator() {
+ return new BagStateIterator();
+ }
- @Override
- public String toString() {
- return String.format("BagStateIterable: composedKey=%s", getComposedKey());
- }
+ @Override
+ public String toString() {
+ return String.format("BagStateIterable: composedKey=%s", getComposedKey());
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
index b0fe29b..7c6a239 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,7 +20,6 @@ package org.apache.beam.runners.jstorm.translation.runtime.state;
import static com.google.common.base.Preconditions.checkNotNull;
import javax.annotation.Nullable;
-
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.ReadableState;
@@ -30,59 +29,60 @@ import org.apache.beam.sdk.transforms.Combine;
* JStorm implementation of {@link CombiningState}.
*/
public class JStormCombiningState<InputT, AccumT, OutputT>
- implements CombiningState<InputT, AccumT, OutputT> {
+ implements CombiningState<InputT, AccumT, OutputT> {
+
+ @Nullable
+ private final BagState<AccumT> accumBagState;
+ private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
- @Nullable
- private final BagState<AccumT> accumBagState;
- private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
- JStormCombiningState(
- BagState<AccumT> accumBagState,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- this.accumBagState = checkNotNull(accumBagState, "accumBagState");
- this.combineFn = checkNotNull(combineFn, "combineFn");
- }
+ JStormCombiningState(
+ BagState<AccumT> accumBagState,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ this.accumBagState = checkNotNull(accumBagState, "accumBagState");
+ this.combineFn = checkNotNull(combineFn, "combineFn");
+ }
- @Override
- public AccumT getAccum() {
- // TODO: replacing the accumBagState with the merged accum.
- return combineFn.mergeAccumulators(accumBagState.read());
- }
+ @Override
+ public AccumT getAccum() {
+ // TODO: replacing the accumBagState with the merged accum.
+ return combineFn.mergeAccumulators(accumBagState.read());
+ }
- @Override
- public void addAccum(AccumT accumT) {
- accumBagState.add(accumT);
- }
+ @Override
+ public void addAccum(AccumT accumT) {
+ accumBagState.add(accumT);
+ }
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
- return combineFn.mergeAccumulators(iterable);
- }
+ @Override
+ public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
+ return combineFn.mergeAccumulators(iterable);
+ }
- @Override
- public void add(InputT input) {
- accumBagState.add(
- combineFn.addInput(combineFn.createAccumulator(), input));
- }
+ @Override
+ public void add(InputT input) {
+ accumBagState.add(
+ combineFn.addInput(combineFn.createAccumulator(), input));
+ }
- @Override
- public ReadableState<Boolean> isEmpty() {
- return accumBagState.isEmpty();
- }
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return accumBagState.isEmpty();
+ }
- @Override
- public OutputT read() {
- return combineFn.extractOutput(
- combineFn.mergeAccumulators(accumBagState.read()));
- }
+ @Override
+ public OutputT read() {
+ return combineFn.extractOutput(
+ combineFn.mergeAccumulators(accumBagState.read()));
+ }
- @Override
- public CombiningState<InputT, AccumT, OutputT> readLater() {
- // TODO: support prefetch.
- return this;
- }
+ @Override
+ public CombiningState<InputT, AccumT, OutputT> readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
- @Override
- public void clear() {
- accumBagState.clear();
- }
+ @Override
+ public void clear() {
+ accumBagState.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
index f101beb..f1c1ed0 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,137 +18,136 @@
package org.apache.beam.runners.jstorm.translation.runtime.state;
import com.alibaba.jstorm.cache.IKvStore;
+import java.io.IOException;
+import java.util.Map;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Map;
-
public class JStormMapState<K, V> implements MapState<K, V> {
- private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
-
- private final K key;
- private final StateNamespace namespace;
- private IKvStore<K, V> kvStore;
-
- public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
- this.key = key;
- this.namespace = namespace;
- this.kvStore = kvStore;
+ private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
+
+ private final K key;
+ private final StateNamespace namespace;
+ private IKvStore<K, V> kvStore;
+
+ public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
+ this.key = key;
+ this.namespace = namespace;
+ this.kvStore = kvStore;
+ }
+
+ @Override
+ public void put(K var1, V var2) {
+ try {
+ kvStore.put(var1, var2);
+ } catch (IOException e) {
+ reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
}
-
- @Override
- public void put(K var1, V var2) {
- try {
- kvStore.put(var1, var2);
- } catch (IOException e) {
- reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
- }
+ }
+
+ @Override
+ public ReadableState<V> putIfAbsent(K var1, V var2) {
+ ReadableState<V> ret = null;
+ try {
+ V value = kvStore.get(var1);
+ if (value == null) {
+ kvStore.put(var1, var2);
+ ret = new MapReadableState<>(null);
+ } else {
+ ret = new MapReadableState<>(value);
+ }
+ } catch (IOException e) {
+ reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
}
-
- @Override
- public ReadableState<V> putIfAbsent(K var1, V var2) {
- ReadableState<V> ret = null;
- try {
- V value = kvStore.get(var1);
- if (value == null) {
- kvStore.put(var1, var2);
- ret = new MapReadableState<>(null);
- } else {
- ret = new MapReadableState<>(value);
- }
- } catch (IOException e) {
- reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
- }
- return ret;
+ return ret;
+ }
+
+ @Override
+ public void remove(K var1) {
+ try {
+ kvStore.remove(var1);
+ } catch (IOException e) {
+ reportError(String.format("Failed to remove key=%s", var1), e);
}
-
- @Override
- public void remove(K var1) {
- try {
- kvStore.remove(var1);
- } catch (IOException e) {
- reportError(String.format("Failed to remove key=%s", var1), e);
- }
+ }
+
+ @Override
+ public ReadableState<V> get(K var1) {
+ ReadableState<V> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState(kvStore.get(var1));
+ } catch (IOException e) {
+ reportError(String.format("Failed to get value for key=%s", var1), e);
}
-
- @Override
- public ReadableState<V> get(K var1) {
- ReadableState<V> ret = new MapReadableState<>(null);
- try {
- ret = new MapReadableState(kvStore.get(var1));
- } catch (IOException e) {
- reportError(String.format("Failed to get value for key=%s", var1), e);
- }
- return ret;
+ return ret;
+ }
+
+ @Override
+ public ReadableState<Iterable<K>> keys() {
+ ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState<>(kvStore.keys());
+ } catch (IOException e) {
+ reportError(String.format("Failed to get keys"), e);
}
-
- @Override
- public ReadableState<Iterable<K>> keys() {
- ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
- try {
- ret = new MapReadableState<>(kvStore.keys());
- } catch (IOException e) {
- reportError(String.format("Failed to get keys"), e);
- }
- return ret;
+ return ret;
+ }
+
+ @Override
+ public ReadableState<Iterable<V>> values() {
+ ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState<>(kvStore.values());
+ } catch (IOException e) {
+ reportError(String.format("Failed to get values"), e);
}
-
- @Override
- public ReadableState<Iterable<V>> values() {
- ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
- try {
- ret = new MapReadableState<>(kvStore.values());
- } catch (IOException e) {
- reportError(String.format("Failed to get values"), e);
- }
- return ret;
- }
-
- @Override
- public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
- ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
- try {
- ret = new MapReadableState<>(kvStore.entries());
- } catch (IOException e) {
- reportError(String.format("Failed to get values"), e);
- }
- return ret;
+ return ret;
+ }
+
+ @Override
+ public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
+ ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
+ try {
+ ret = new MapReadableState<>(kvStore.entries());
+ } catch (IOException e) {
+ reportError(String.format("Failed to get values"), e);
}
-
- @Override
- public void clear() {
- try {
- Iterable<K> keys = kvStore.keys();
- kvStore.removeBatch(keys);
- } catch (IOException e) {
- reportError(String.format("Failed to clear map state"), e);
- }
+ return ret;
+ }
+
+ @Override
+ public void clear() {
+ try {
+ Iterable<K> keys = kvStore.keys();
+ kvStore.removeBatch(keys);
+ } catch (IOException e) {
+ reportError(String.format("Failed to clear map state"), e);
}
+ }
- private void reportError(String errorInfo, IOException e) {
- LOG.error(errorInfo, e);
- throw new RuntimeException(errorInfo);
- }
+ private void reportError(String errorInfo, IOException e) {
+ LOG.error(errorInfo, e);
+ throw new RuntimeException(errorInfo);
+ }
- private class MapReadableState<T> implements ReadableState<T> {
- private T value;
+ private class MapReadableState<T> implements ReadableState<T> {
+ private T value;
- public MapReadableState(T value) {
- this.value = value;
- }
+ public MapReadableState(T value) {
+ this.value = value;
+ }
- @Override
- public T read() {
- return value;
- }
+ @Override
+ public T read() {
+ return value;
+ }
- @Override
- public ReadableState<T> readLater() {
- return this;
- }
+ @Override
+ public ReadableState<T> readLater() {
+ return this;
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
index 8a0cb73..80ef3a2 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,13 +17,16 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime.state;
-import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import com.alibaba.jstorm.cache.ComposedKey;
import com.alibaba.jstorm.cache.IKvStoreManager;
-
+import java.io.IOException;
+import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
@@ -41,151 +44,148 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.joda.time.Instant;
-import javax.annotation.Nullable;
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* JStorm implementation of {@link StateInternals}.
*/
public class JStormStateInternals<K> implements StateInternals {
- private static final String STATE_INFO = "state-info:";
-
- @Nullable
- private final K key;
- private final IKvStoreManager kvStoreManager;
- private final TimerService timerService;
- private final int executorId;
-
- public JStormStateInternals(K key, IKvStoreManager kvStoreManager,
- TimerService timerService, int executorId) {
- this.key = key;
- this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager");
- this.timerService = checkNotNull(timerService, "timerService");
- this.executorId = executorId;
- }
-
- @Nullable
- @Override
- public K getKey() {
- return key;
- }
-
- @Override
- public <T extends State> T state(
- StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
- // throw new UnsupportedOperationException("StateContext is not supported.");
- /**
- * TODO:
- * Same implementation as state() which is without StateContext. This might be updated after
- * we figure out if we really need StateContext for JStorm state internals.
- */
- return state(namespace, address);
- }
-
- @Override
- public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
- return address.getSpec().bind(address.getId(), new StateBinder() {
- @Override
- public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
- try {
- return new JStormValueState<>(
- getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
- } catch (IOException e) {
- throw new RuntimeException();
- }
- }
-
- @Override
- public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
- try {
- return new JStormBagState(
- getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
- kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
- } catch (IOException e) {
- throw new RuntimeException();
+ private static final String STATE_INFO = "state-info:";
+
+ @Nullable
+ private final K key;
+ private final IKvStoreManager kvStoreManager;
+ private final TimerService timerService;
+ private final int executorId;
+
+ public JStormStateInternals(K key, IKvStoreManager kvStoreManager,
+ TimerService timerService, int executorId) {
+ this.key = key;
+ this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager");
+ this.timerService = checkNotNull(timerService, "timerService");
+ this.executorId = executorId;
+ }
+
+ @Nullable
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public <T extends State> T state(
+ StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
+ // throw new UnsupportedOperationException("StateContext is not supported.");
+ /**
+ * TODO:
+ * Same implementation as state() which is without StateContext. This might be updated after
+ * we figure out if we really need StateContext for JStorm state internals.
+ */
+ return state(namespace, address);
+ }
+
+ @Override
+ public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
+ return address.getSpec().bind(address.getId(), new StateBinder() {
+ @Override
+ public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
+ try {
+ return new JStormValueState<>(
+ getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ @Override
+ public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
+ try {
+ return new JStormBagState(
+ getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
+ kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ @Override
+ public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+ String id,
+ StateSpec<MapState<KeyT, ValueT>> spec,
+ Coder<KeyT> mapKeyCoder,
+ Coder<ValueT> mapValueCoder) {
+ try {
+ return new JStormMapState<>(
+ getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> CombiningState bindCombining(
+ String id,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+ Coder<AccumT> accumCoder,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ try {
+ BagState<AccumT> accumBagState = new JStormBagState(
+ getKey(), namespace,
+ kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
+ kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+ return new JStormCombiningState<>(accumBagState, combineFn);
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+
+
+ @Override
+ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
+ bindCombiningWithContext(
+ String id,
+ StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder,
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public WatermarkHoldState bindWatermark(
+ String id,
+ StateSpec<WatermarkHoldState> spec,
+ final TimestampCombiner timestampCombiner) {
+ try {
+ BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
+ getKey(), namespace,
+ kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
+ kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+
+ Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
+ new BinaryCombineFn<Instant>() {
+ @Override
+ public Instant apply(Instant left, Instant right) {
+ return timestampCombiner.combine(left, right);
}
- }
-
- @Override
- public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- String id,
- StateSpec<MapState<KeyT, ValueT>> spec,
- Coder<KeyT> mapKeyCoder,
- Coder<ValueT> mapValueCoder) {
- try {
- return new JStormMapState<>(getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState bindCombining(
- String id,
- StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
- Coder<AccumT> accumCoder,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- try {
- BagState<AccumT> accumBagState = new JStormBagState(
- getKey(), namespace,
- kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
- kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
- return new JStormCombiningState<>(accumBagState, combineFn);
- } catch (IOException e) {
- throw new RuntimeException();
- }
- }
-
-
- @Override
- public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
- bindCombiningWithContext(
- String id,
- StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder,
- CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public WatermarkHoldState bindWatermark(
- String id,
- StateSpec<WatermarkHoldState> spec,
- final TimestampCombiner timestampCombiner) {
- try {
- BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
- getKey(), namespace,
- kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
- kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
-
- Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
- new BinaryCombineFn<Instant>() {
- @Override
- public Instant apply(Instant left, Instant right) {
- return timestampCombiner.combine(left, right);
- }};
- return new JStormWatermarkHoldState(
- namespace,
- new JStormCombiningState<>(
- accumBagState,
- outputTimeCombineFn),
- timestampCombiner,
- timerService);
- } catch (IOException e) {
- throw new RuntimeException();
- }
- }
- });
- }
-
- private String getStoreId(String stateId) {
- return String.format("%s-%s", stateId, executorId);
- }
+ };
+ return new JStormWatermarkHoldState(
+ namespace,
+ new JStormCombiningState<>(
+ accumBagState,
+ outputTimeCombineFn),
+ timestampCombiner,
+ timerService);
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+ });
+ }
+
+ private String getStoreId(String stateId) {
+ return String.format("%s-%s", stateId, executorId);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
index 5ad3663..79ff6b4 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,66 +19,64 @@ package org.apache.beam.runners.jstorm.translation.runtime.state;
import com.alibaba.jstorm.cache.ComposedKey;
import com.alibaba.jstorm.cache.IKvStore;
-
+import java.io.IOException;
+import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.sdk.state.ValueState;
-import javax.annotation.Nullable;
-import java.io.IOException;
-
/**
* JStorm implementation of {@link ValueState}.
*/
public class JStormValueState<K, T> implements ValueState<T> {
- @Nullable
- private final K key;
- private final StateNamespace namespace;
- private final IKvStore<ComposedKey, T> kvState;
+ @Nullable
+ private final K key;
+ private final StateNamespace namespace;
+ private final IKvStore<ComposedKey, T> kvState;
- JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
- this.key = key;
- this.namespace = namespace;
- this.kvState = kvState;
- }
+ JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
+ this.key = key;
+ this.namespace = namespace;
+ this.kvState = kvState;
+ }
- @Override
- public void write(T t) {
- try {
- kvState.put(getComposedKey(), t);
- } catch (IOException e) {
- throw new RuntimeException(String.format(
- "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
- }
+ @Override
+ public void write(T t) {
+ try {
+ kvState.put(getComposedKey(), t);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
}
+ }
- @Override
- public T read() {
- try {
- return kvState.get(getComposedKey());
- } catch (IOException e) {
- throw new RuntimeException(String.format(
- "Failed to read key: %s, namespace: %s.", key, namespace));
- }
+ @Override
+ public T read() {
+ try {
+ return kvState.get(getComposedKey());
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Failed to read key: %s, namespace: %s.", key, namespace));
}
+ }
- @Override
- public ValueState<T> readLater() {
- // TODO: support prefetch.
- return this;
- }
+ @Override
+ public ValueState<T> readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
- @Override
- public void clear() {
- try {
- kvState.remove(getComposedKey());
- } catch (IOException e) {
- throw new RuntimeException(String.format(
- "Failed to clear key: %s, namespace: %s.", key, namespace));
- }
+ @Override
+ public void clear() {
+ try {
+ kvState.remove(getComposedKey());
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Failed to clear key: %s, namespace: %s.", key, namespace));
}
+ }
- private ComposedKey getComposedKey() {
- return ComposedKey.of(key, namespace);
- }
+ private ComposedKey getComposedKey() {
+ return ComposedKey.of(key, namespace);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
index 659d77c..dc3ba43 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,8 +19,8 @@ package org.apache.beam.runners.jstorm.translation.runtime.state;
import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.WatermarkHoldState;
@@ -32,52 +32,52 @@ import org.joda.time.Instant;
*/
public class JStormWatermarkHoldState implements WatermarkHoldState {
- private final StateNamespace namespace;
- private final GroupingState<Instant, Instant> watermarkHoldsState;
- private final TimestampCombiner timestampCombiner;
- private final TimerService timerService;
+ private final StateNamespace namespace;
+ private final GroupingState<Instant, Instant> watermarkHoldsState;
+ private final TimestampCombiner timestampCombiner;
+ private final TimerService timerService;
- JStormWatermarkHoldState(
- StateNamespace namespace,
- GroupingState<Instant, Instant> watermarkHoldsState,
- TimestampCombiner timestampCombiner,
- TimerService timerService) {
- this.namespace = checkNotNull(namespace, "namespace");
- this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
- this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
- this.timerService = checkNotNull(timerService, "timerService");
- }
+ JStormWatermarkHoldState(
+ StateNamespace namespace,
+ GroupingState<Instant, Instant> watermarkHoldsState,
+ TimestampCombiner timestampCombiner,
+ TimerService timerService) {
+ this.namespace = checkNotNull(namespace, "namespace");
+ this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
+ this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
+ this.timerService = checkNotNull(timerService, "timerService");
+ }
- @Override
- public TimestampCombiner getTimestampCombiner() {
- return timestampCombiner;
- }
+ @Override
+ public TimestampCombiner getTimestampCombiner() {
+ return timestampCombiner;
+ }
- @Override
- public void add(Instant instant) {
- timerService.addWatermarkHold(namespace.stringKey(), instant);
- watermarkHoldsState.add(instant);
- }
+ @Override
+ public void add(Instant instant) {
+ timerService.addWatermarkHold(namespace.stringKey(), instant);
+ watermarkHoldsState.add(instant);
+ }
- @Override
- public ReadableState<Boolean> isEmpty() {
- return watermarkHoldsState.isEmpty();
- }
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return watermarkHoldsState.isEmpty();
+ }
- @Override
- public Instant read() {
- return watermarkHoldsState.read();
- }
+ @Override
+ public Instant read() {
+ return watermarkHoldsState.read();
+ }
- @Override
- public WatermarkHoldState readLater() {
- // TODO: support prefetch.
- return this;
- }
+ @Override
+ public WatermarkHoldState readLater() {
+ // TODO: support prefetch.
+ return this;
+ }
- @Override
- public void clear() {
- timerService.clearWatermarkHold(namespace.stringKey());
- watermarkHoldsState.clear();
- }
+ @Override
+ public void clear() {
+ timerService.clearWatermarkHold(namespace.stringKey());
+ watermarkHoldsState.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
index 4b5f83c..184a957 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,83 +17,84 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime.timer;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.sdk.state.TimeDomain;
import org.joda.time.Instant;
-import javax.annotation.Nullable;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* JStorm implementation of {@link TimerInternals}.
*/
public class JStormTimerInternals<K> implements TimerInternals {
- private final K key;
- private final DoFnExecutor<?, ?> doFnExecutor;
- private final TimerService timerService;
-
-
- public JStormTimerInternals(@Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) {
- this.key = key;
- this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor");
- this.timerService = checkNotNull(timerService, "timerService");
- }
-
- @Override
- public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
- setTimer(TimerData.of(timerId, namespace, target, timeDomain));
- }
-
- @Override
- @Deprecated
- public void setTimer(TimerData timerData) {
- timerService.setTimer(key, timerData, doFnExecutor);
- }
-
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
- throw new UnsupportedOperationException(
- "Canceling of a timer is not yet supported.");
- }
-
- @Override
- @Deprecated
- public void deleteTimer(StateNamespace namespace, String timerId) {
- throw new UnsupportedOperationException(
- "Canceling of a timer is not yet supported.");
- }
-
- @Override
- @Deprecated
- public void deleteTimer(TimerData timerData) {
- throw new UnsupportedOperationException(
- "Canceling of a timer is not yet supported.");
- }
-
- @Override
- public Instant currentProcessingTime() {
- return Instant.now();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return null;
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return new Instant(timerService.currentInputWatermark());
- }
-
- @Override
- @Nullable
- public Instant currentOutputWatermarkTime() {
- return new Instant(timerService.currentOutputWatermark());
- }
+ private final K key;
+ private final DoFnExecutor<?, ?> doFnExecutor;
+ private final TimerService timerService;
+
+
+ public JStormTimerInternals(
+ @Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) {
+ this.key = key;
+ this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor");
+ this.timerService = checkNotNull(timerService, "timerService");
+ }
+
+ @Override
+ public void setTimer(
+ StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
+ setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+ }
+
+ @Override
+ @Deprecated
+ public void setTimer(TimerData timerData) {
+ timerService.setTimer(key, timerData, doFnExecutor);
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer is not yet supported.");
+ }
+
+ @Override
+ @Deprecated
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer is not yet supported.");
+ }
+
+ @Override
+ @Deprecated
+ public void deleteTimer(TimerData timerData) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer is not yet supported.");
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return Instant.now();
+ }
+
+ @Override
+ @Nullable
+ public Instant currentSynchronizedProcessingTime() {
+ return null;
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return new Instant(timerService.currentInputWatermark());
+ }
+
+ @Override
+ @Nullable
+ public Instant currentOutputWatermarkTime() {
+ return new Instant(timerService.currentOutputWatermark());
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
index 9651fc2..7e7a54a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
@@ -17,10 +17,9 @@
*/
package org.apache.beam.runners.jstorm.translation.translator;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.jstorm.translation.TranslationContext;
import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
-
-import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
@@ -33,18 +32,20 @@ import org.apache.beam.sdk.values.TupleTag;
*/
public class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
- @Override
- public void translateNode(Read.Bounded<T> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+ @Override
+ public void translateNode(Read.Bounded<T> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description =
+ describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
- TupleTag<?> outputTag = userGraphContext.getOutputTag();
- PValue outputValue = userGraphContext.getOutput();
- UnboundedSourceSpout spout = new UnboundedSourceSpout(
- description,
- new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
- userGraphContext.getOptions(), outputTag);
+ TupleTag<?> outputTag = userGraphContext.getOutputTag();
+ PValue outputValue = userGraphContext.getOutput();
+ UnboundedSourceSpout spout = new UnboundedSourceSpout(
+ description,
+ new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
+ userGraphContext.getOptions(), outputTag);
- context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(outputTag, outputValue));
- }
+ context.getExecutionGraphContext().registerSpout(
+ spout, TaggedPValue.of(outputTag, outputValue));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
index c4da58a..fe5fca9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.jstorm.translation.translator;
import org.apache.beam.sdk.transforms.Combine;
-public class CombineGloballyTranslator<InputT, OutputT> extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> {
-
+public class CombineGloballyTranslator<InputT, OutputT>
+ extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> {
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
index 99cbff7..c382fb7 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.jstorm.translation.translator;
import org.apache.beam.sdk.transforms.Combine;
-public class CombinePerKeyTranslator<K, InputT, OutputT> extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> {
-
+public class CombinePerKeyTranslator<K, InputT, OutputT>
+ extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> {
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
index 4558216..bf8d472 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
@@ -18,32 +18,30 @@
package org.apache.beam.runners.jstorm.translation.translator;
import com.google.common.collect.Maps;
-import org.apache.beam.sdk.transforms.Flatten;
-
+import java.util.Map;
import org.apache.beam.runners.jstorm.translation.TranslationContext;
import org.apache.beam.runners.jstorm.translation.runtime.FlattenExecutor;
+import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import java.util.Map;
-
public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> {
- @Override
- public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ @Override
+ public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- // Since a new tag is created in PCollectionList, retrieve the real tag here.
- Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
- for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
- PCollection<V> pc = (PCollection<V>) entry.getValue();
- inputs.putAll(pc.expand());
- }
- System.out.println("Real inputs: " + inputs);
- System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
- String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
- FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
- context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
+ // Since a new tag is created in PCollectionList, retrieve the real tag here.
+ Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
+ for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
+ PCollection<V> pc = (PCollection<V>) entry.getValue();
+ inputs.putAll(pc.expand());
}
+ System.out.println("Real inputs: " + inputs);
+ System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
+ String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
+ FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
+ context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
index 6b8297b..85f96ce 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
@@ -17,53 +17,52 @@
*/
package org.apache.beam.runners.jstorm.translation.translator;
-import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
import com.google.common.collect.Lists;
-import org.apache.beam.sdk.transforms.GroupByKey;
-
+import java.util.Collections;
+import java.util.List;
import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
+import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-
-import java.util.Collections;
-import java.util.List;
+import org.apache.beam.sdk.values.WindowingStrategy;
public class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
- // information of transform
- protected PCollection<KV<K, V>> input;
- protected PCollection<KV<K, Iterable<V>>> output;
- protected List<TupleTag<?>> inputTags;
- protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
- protected List<TupleTag<?>> sideOutputTags;
- protected List<PCollectionView<?>> sideInputs;
- protected WindowingStrategy<?, ?> windowingStrategy;
+ // information of transform
+ protected PCollection<KV<K, V>> input;
+ protected PCollection<KV<K, Iterable<V>>> output;
+ protected List<TupleTag<?>> inputTags;
+ protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
+ protected List<TupleTag<?>> sideOutputTags;
+ protected List<PCollectionView<?>> sideInputs;
+ protected WindowingStrategy<?, ?> windowingStrategy;
- @Override
- public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+ @Override
+ public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description =
+ describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
- input = (PCollection<KV<K, V>>) userGraphContext.getInput();
- output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
+ input = (PCollection<KV<K, V>>) userGraphContext.getInput();
+ output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
- inputTags = userGraphContext.getInputTags();
- mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
- sideOutputTags = Lists.newArrayList();
+ inputTags = userGraphContext.getInputTags();
+ mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
+ sideOutputTags = Lists.newArrayList();
- sideInputs = Collections.<PCollectionView<?>>emptyList();
- windowingStrategy = input.getWindowingStrategy();
+ sideInputs = Collections.<PCollectionView<?>>emptyList();
+ windowingStrategy = input.getWindowingStrategy();
- GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(
- userGraphContext.getStepName(),
- description,
- context,
- context.getUserGraphContext().getOptions(),
- windowingStrategy,
- mainOutputTag,
- sideOutputTags);
- context.addTransformExecutor(groupByWindowExecutor);
- }
+ GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ context,
+ context.getUserGraphContext().getOptions(),
+ windowingStrategy,
+ mainOutputTag,
+ sideOutputTags);
+ context.addTransformExecutor(groupByWindowExecutor);
+ }
}