You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:12 UTC

[12/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
index 2d80617..7f98c61 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
@@ -17,177 +17,175 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+import com.alibaba.jstorm.utils.KryoSerializer;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.jstorm.JStormPipelineOptions;
 import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-import com.alibaba.jstorm.utils.KryoSerializer;
+import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
-
-import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Values;
-
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Spout implementation that wraps a Beam UnboundedSource
- *
+ * <p>
  * TODO: add wrapper to support metrics in UnboundedSource.
  */
 public class UnboundedSourceSpout extends AdaptorBasicSpout {
-    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
-
-    private final String description;
-    private final UnboundedSource source;
-    private final SerializedPipelineOptions serializedOptions;
-    private final TupleTag<?> outputTag;
-
-    private transient JStormPipelineOptions pipelineOptions;
-    private transient UnboundedSource.UnboundedReader reader;
-    private transient SpoutOutputCollector collector;
-
-    private volatile boolean hasNextRecord;
-    private AtomicBoolean activated = new AtomicBoolean();
-
-    private KryoSerializer<WindowedValue> serializer;
-
-    private long lastWaterMark = 0l;
-
-    public UnboundedSourceSpout(
-            String description,
-            UnboundedSource source,
-            JStormPipelineOptions options,
-            TupleTag<?> outputTag) {
-        this.description = checkNotNull(description, "description");
-        this.source = checkNotNull(source, "source");
-        this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
-        this.outputTag = checkNotNull(outputTag, "outputTag");
-    }
-
-    @Override
-    public synchronized void close() {
-        try {
-            activated.set(false);
-            this.reader.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    public void activate() {
-        activated.set(true);
-        
-    }
-
-    @Override
-    public void deactivate() {
-        activated.set(false);
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        throw new UnsupportedOperationException();
+  private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
+
+  private final String description;
+  private final UnboundedSource source;
+  private final SerializedPipelineOptions serializedOptions;
+  private final TupleTag<?> outputTag;
+
+  private transient JStormPipelineOptions pipelineOptions;
+  private transient UnboundedSource.UnboundedReader reader;
+  private transient SpoutOutputCollector collector;
+
+  private volatile boolean hasNextRecord;
+  private AtomicBoolean activated = new AtomicBoolean();
+
+  private KryoSerializer<WindowedValue> serializer;
+
+  private long lastWaterMark = 0l;
+
+  public UnboundedSourceSpout(
+      String description,
+      UnboundedSource source,
+      JStormPipelineOptions options,
+      TupleTag<?> outputTag) {
+    this.description = checkNotNull(description, "description");
+    this.source = checkNotNull(source, "source");
+    this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
+    this.outputTag = checkNotNull(outputTag, "outputTag");
+  }
+
+  @Override
+  public synchronized void close() {
+    try {
+      activated.set(false);
+      this.reader.close();
+    } catch (IOException e) {
+      e.printStackTrace();
     }
-
-    @Override
-    public void fail(Object msgId) {
-        throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void activate() {
+    activated.set(true);
+
+  }
+
+  @Override
+  public void deactivate() {
+    activated.set(false);
+  }
+
+  @Override
+  public void ack(Object msgId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void fail(Object msgId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return null;
+  }
+
+  @Override
+  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+    try {
+      this.collector = collector;
+      this.pipelineOptions =
+          this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
+
+      createSourceReader(null);
+
+      this.serializer = new KryoSerializer<>(conf);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to create unbounded reader.", e);
     }
+  }
 
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
+  public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
+    if (reader != null) {
+      reader.close();
     }
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        try {
-            this.collector = collector;
-            this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
-
-            createSourceReader(null);
-
-            this.serializer = new KryoSerializer<>(conf);
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to create unbounded reader.", e);
-        }
+    reader = this.source.createReader(this.pipelineOptions, checkpointMark);
+    hasNextRecord = this.reader.start();
+  }
+
+  @Override
+  public synchronized void nextTuple() {
+    if (!activated.get()) {
+      return;
     }
-
-    public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
-        if (reader != null) {
-            reader.close();
+    try {
+      if (!hasNextRecord) {
+        hasNextRecord = reader.advance();
+      }
+
+      while (hasNextRecord && activated.get()) {
+        Object value = reader.getCurrent();
+        Instant timestamp = reader.getCurrentTimestamp();
+
+        WindowedValue wv =
+            WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+        LOG.debug("Source output: " + wv.getValue());
+        if (keyedEmit(outputTag.getId())) {
+          KV kv = (KV) wv.getValue();
+          // Convert WindowedValue<KV> to <K, WindowedValue<V>>
+          byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue()));
+          collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue));
+        } else {
+          byte[] immutableValue = serializer.serialize(wv);
+          collector.emit(outputTag.getId(), new Values(immutableValue));
         }
-        reader = this.source.createReader(this.pipelineOptions, checkpointMark);
-        hasNextRecord = this.reader.start();
-    }
 
-    @Override
-    public synchronized void nextTuple() {
-        if (!activated.get()) {
-            return;
-        }
-        try {
-            if (!hasNextRecord) {
-                hasNextRecord = reader.advance();
-            }
-
-            while (hasNextRecord && activated.get()) {
-                Object value = reader.getCurrent();
-                Instant timestamp = reader.getCurrentTimestamp();
-
-                WindowedValue wv = WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-                LOG.debug("Source output: " + wv.getValue());
-                if (keyedEmit(outputTag.getId())) {
-                    KV kv = (KV) wv.getValue();
-                    // Convert WindowedValue<KV> to <K, WindowedValue<V>>
-                    byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue()));
-                    collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue));
-                } else {
-                    byte[] immutableValue = serializer.serialize(wv);
-                    collector.emit(outputTag.getId(), new Values(immutableValue));
-                }
-
-                // move to next record
-                hasNextRecord = reader.advance();
-            }
-
-            Instant waterMark = reader.getWatermark();
-            if (waterMark != null && lastWaterMark <  waterMark.getMillis()) {
-                lastWaterMark = waterMark.getMillis();
-                collector.flush();
-                collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis()));
-                LOG.debug("Source output: WM-{}", waterMark.toDateTime());
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("Exception reading values from source.", e);
-        }
+        // move to next record
+        hasNextRecord = reader.advance();
+      }
+
+      Instant waterMark = reader.getWatermark();
+      if (waterMark != null && lastWaterMark < waterMark.getMillis()) {
+        lastWaterMark = waterMark.getMillis();
+        collector.flush();
+        collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis()));
+        LOG.debug("Source output: WM-{}", waterMark.toDateTime());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Exception reading values from source.", e);
     }
+  }
 
-    public UnboundedSource getUnboundedSource() {
-        return source;
-    }
+  public UnboundedSource getUnboundedSource() {
+    return source;
+  }
 
-    public UnboundedSource.UnboundedReader getUnboundedSourceReader() {
-        return reader;
-    }
+  public UnboundedSource.UnboundedReader getUnboundedSourceReader() {
+    return reader;
+  }
 
-    @Override
-    public String toString() {
-        return description;
-    }
+  @Override
+  public String toString() {
+    return description;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
index 7b0e8db..4320967 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
@@ -26,30 +26,31 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 public class ViewExecutor implements Executor {
 
-    private final String description;
-    private final TupleTag outputTag;
-    private ExecutorsBolt executorsBolt;
-
-    public ViewExecutor(String description, TupleTag outputTag) {
-        this.description = description;
-        this.outputTag = outputTag;
-    }
-
-    @Override
-    public void init(ExecutorContext context) {
-        this.executorsBolt = context.getExecutorsBolt();
-    }
-
-    @Override
-    public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-        executorsBolt.processExecutorElem(outputTag, elem);
-    }
-
-    @Override
-    public void cleanup() {}
-
-    @Override
-    public String toString() {
-        return description;
-    }
+  private final String description;
+  private final TupleTag outputTag;
+  private ExecutorsBolt executorsBolt;
+
+  public ViewExecutor(String description, TupleTag outputTag) {
+    this.description = description;
+    this.outputTag = outputTag;
+  }
+
+  @Override
+  public void init(ExecutorContext context) {
+    this.executorsBolt = context.getExecutorsBolt();
+  }
+
+  @Override
+  public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+    executorsBolt.processExecutorElem(outputTag, elem);
+  }
+
+  @Override
+  public void cleanup() {
+  }
+
+  @Override
+  public String toString() {
+    return description;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
index a6c3c16..7f21d26 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
@@ -17,7 +17,10 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.collect.Iterables;
+import java.util.Collection;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -26,82 +29,79 @@ import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
 public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
-    private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
-
-    private final String description;
-    private WindowFn<T, W> windowFn;
-    private ExecutorsBolt executorsBolt;
-    private TupleTag outputTag;
-
-    class JStormAssignContext<InputT, W extends BoundedWindow>
-            extends WindowFn<InputT, W>.AssignContext {
-        private final WindowedValue<InputT> value;
-
-        JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
-            fn.super();
-            checkArgument(
-                    Iterables.size(value.getWindows()) == 1,
-                    String.format(
-                            "%s passed to window assignment must be in a single window, but it was in %s: %s",
-                            WindowedValue.class.getSimpleName(),
-                            Iterables.size(value.getWindows()),
-                            value.getWindows()));
-            this.value = value;
-        }
-
-        @Override
-        public InputT element() {
-            return value.getValue();
-        }
-
-        @Override
-        public Instant timestamp() {
-            return value.getTimestamp();
-        }
-
-        @Override
-        public BoundedWindow window() {
-            return Iterables.getOnlyElement(value.getWindows());
-        }
+  private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
+
+  private final String description;
+  private WindowFn<T, W> windowFn;
+  private ExecutorsBolt executorsBolt;
+  private TupleTag outputTag;
+
+  class JStormAssignContext<InputT, W extends BoundedWindow>
+      extends WindowFn<InputT, W>.AssignContext {
+    private final WindowedValue<InputT> value;
+
+    JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+      fn.super();
+      checkArgument(
+          Iterables.size(value.getWindows()) == 1,
+          String.format(
+              "%s passed to window assignment must be in a single window, but it was in %s: %s",
+              WindowedValue.class.getSimpleName(),
+              Iterables.size(value.getWindows()),
+              value.getWindows()));
+      this.value = value;
     }
 
-    public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) {
-        this.description = description;
-        this.windowFn = windowFn;
-        this.outputTag = outputTag;
+    @Override
+    public InputT element() {
+      return value.getValue();
     }
 
     @Override
-    public void init(ExecutorContext context) {
-        this.executorsBolt = context.getExecutorsBolt();
+    public Instant timestamp() {
+      return value.getTimestamp();
     }
 
     @Override
-    public void process(TupleTag tag, WindowedValue elem) {
-        Collection<W> windows = null;
-        try {
-            windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem));
-            for (W window: windows) {
-                executorsBolt.processExecutorElem(
-                        outputTag,
-                        WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
-            }
-        } catch (Exception e) {
-            LOG.warn("Failed to assign windows for elem=" + elem, e);
-        }
+    public BoundedWindow window() {
+      return Iterables.getOnlyElement(value.getWindows());
     }
+  }
+
+  public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) {
+    this.description = description;
+    this.windowFn = windowFn;
+    this.outputTag = outputTag;
+  }
+
+  @Override
+  public void init(ExecutorContext context) {
+    this.executorsBolt = context.getExecutorsBolt();
+  }
+
+  @Override
+  public void process(TupleTag tag, WindowedValue elem) {
+    Collection<W> windows = null;
+    try {
+      windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem));
+      for (W window : windows) {
+        executorsBolt.processExecutorElem(
+            outputTag,
+            WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to assign windows for elem=" + elem, e);
+    }
+  }
 
-    @Override
-    public void cleanup() {}
+  @Override
+  public void cleanup() {
+  }
 
 
-    @Override
-    public String toString() {
-        return description;
-    }
+  @Override
+  public String toString() {
+    return description;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
index eaf0549..1466f35 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,162 +17,161 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime.state;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.alibaba.jstorm.cache.ComposedKey;
 import com.alibaba.jstorm.cache.IKvStore;
 import com.alibaba.jstorm.cache.KvStoreIterable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.ReadableState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * JStorm implementation of {@link BagState}.
  */
 class JStormBagState<K, T> implements BagState<T> {
-    private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
-
-    @Nullable
-    private final K key;
-    private final StateNamespace namespace;
-    private final IKvStore<ComposedKey, T> kvState;
-    private final IKvStore<ComposedKey, Object> stateInfoKvState;
-    private int elemIndex;
-
-    public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
-                           IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
-        this.key = key;
-        this.namespace = checkNotNull(namespace, "namespace");
-        this.kvState = checkNotNull(kvState, "kvState");
-        this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
-
-        Integer index = (Integer) stateInfoKvState.get(getComposedKey());
-        this.elemIndex =  index != null ? ++index : 0;
+  private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
+
+  @Nullable
+  private final K key;
+  private final StateNamespace namespace;
+  private final IKvStore<ComposedKey, T> kvState;
+  private final IKvStore<ComposedKey, Object> stateInfoKvState;
+  private int elemIndex;
+
+  public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
+                        IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
+    this.key = key;
+    this.namespace = checkNotNull(namespace, "namespace");
+    this.kvState = checkNotNull(kvState, "kvState");
+    this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
+
+    Integer index = (Integer) stateInfoKvState.get(getComposedKey());
+    this.elemIndex = index != null ? ++index : 0;
+  }
+
+  @Override
+  public void add(T input) {
+    try {
+      kvState.put(getComposedKey(elemIndex), input);
+      stateInfoKvState.put(getComposedKey(), elemIndex);
+      elemIndex++;
+    } catch (IOException e) {
+      throw new RuntimeException(e.getCause());
     }
-
-    @Override
-    public void add(T input) {
-        try {
-            kvState.put(getComposedKey(elemIndex), input);
-            stateInfoKvState.put(getComposedKey(), elemIndex);
-            elemIndex++;
-        } catch (IOException e) {
-            throw new RuntimeException(e.getCause());
-        }
+  }
+
+  @Override
+  public ReadableState<Boolean> isEmpty() {
+    return new ReadableState<Boolean>() {
+      @Override
+      public Boolean read() {
+        return elemIndex <= 0;
+      }
+
+      @Override
+      public ReadableState<Boolean> readLater() {
+        // TODO: support prefetch.
+        return this;
+      }
+    };
+  }
+
+  @Override
+  public Iterable<T> read() {
+    return new BagStateIterable(elemIndex);
+  }
+
+  @Override
+  public BagState readLater() {
+    // TODO: support prefetch.
+    return this;
+  }
+
+  @Override
+  public void clear() {
+    try {
+      for (int i = 0; i < elemIndex; i++) {
+        kvState.remove(getComposedKey(i));
+      }
+      stateInfoKvState.remove(getComposedKey());
+      elemIndex = 0;
+    } catch (IOException e) {
+      throw new RuntimeException(e.getCause());
     }
+  }
 
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-        return new ReadableState<Boolean>() {
-            @Override
-            public Boolean read() {
-                return elemIndex <= 0;
-            }
-
-            @Override
-            public ReadableState<Boolean> readLater() {
-                // TODO: support prefetch.
-                return this;
-            }
-        };
-    }
+  private ComposedKey getComposedKey() {
+    return ComposedKey.of(key, namespace);
+  }
 
-    @Override
-    public Iterable<T> read() {
-        return new BagStateIterable(elemIndex);
-    }
+  private ComposedKey getComposedKey(int elemIndex) {
+    return ComposedKey.of(key, namespace, elemIndex);
+  }
 
-    @Override
-    public BagState readLater() {
-        // TODO: support prefetch.
-        return this;
-    }
+  private class BagStateIterable implements KvStoreIterable<T> {
 
-    @Override
-    public void clear() {
+    private class BagStateIterator implements Iterator<T> {
+      private final int size;
+      private int cursor = 0;
+
+      BagStateIterator() {
+        Integer s = null;
         try {
-            for (int i = 0; i < elemIndex; i++) {
-                kvState.remove(getComposedKey(i));
-            }
-            stateInfoKvState.remove(getComposedKey());
-            elemIndex = 0;
+          s = (Integer) stateInfoKvState.get(getComposedKey());
         } catch (IOException e) {
-            throw new RuntimeException(e.getCause());
+          LOG.error("Failed to get elemIndex for key={}", getComposedKey());
+        }
+        this.size = s != null ? ++s : 0;
+      }
+
+      @Override
+      public boolean hasNext() {
+        return cursor < size;
+      }
+
+      @Override
+      public T next() {
+        if (cursor >= size) {
+          throw new NoSuchElementException();
         }
-    }
-
-    private ComposedKey getComposedKey() {
-        return ComposedKey.of(key, namespace);
-    }
-
-    private ComposedKey getComposedKey(int elemIndex) {
-        return ComposedKey.of(key, namespace, elemIndex);
-    }
 
-    private class BagStateIterable implements KvStoreIterable<T> {
-
-        private class BagStateIterator implements Iterator<T> {
-            private final int size;
-            private int cursor = 0;
-
-            BagStateIterator() {
-                Integer s = null;
-                try {
-                    s = (Integer) stateInfoKvState.get(getComposedKey());
-                } catch (IOException e) {
-                    LOG.error("Failed to get elemIndex for key={}", getComposedKey());
-                }
-                this.size = s != null ? ++s : 0;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return cursor < size;
-            }
-
-            @Override
-            public T next() {
-                if (cursor >= size) {
-                    throw new NoSuchElementException();
-                }
-
-                T value = null;
-                try {
-                    value = kvState.get(getComposedKey(cursor));
-                } catch (IOException e) {
-                    LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
-                }
-                cursor++;
-                return value;
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
+        T value = null;
+        try {
+          value = kvState.get(getComposedKey(cursor));
+        } catch (IOException e) {
+          LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
         }
+        cursor++;
+        return value;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    }
 
-        private final int size;
+    private final int size;
 
-        BagStateIterable(int size) {
-            this.size = size;
-        }
+    BagStateIterable(int size) {
+      this.size = size;
+    }
 
-        @Override
-        public Iterator<T> iterator() {
-            return new BagStateIterator();
-        }
+    @Override
+    public Iterator<T> iterator() {
+      return new BagStateIterator();
+    }
 
-        @Override
-        public String toString() {
-            return String.format("BagStateIterable: composedKey=%s", getComposedKey());
-        }
+    @Override
+    public String toString() {
+      return String.format("BagStateIterable: composedKey=%s", getComposedKey());
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
index b0fe29b..7c6a239 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,7 +20,6 @@ package org.apache.beam.runners.jstorm.translation.runtime.state;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.ReadableState;
@@ -30,59 +29,60 @@ import org.apache.beam.sdk.transforms.Combine;
  * JStorm implementation of {@link CombiningState}.
  */
 public class JStormCombiningState<InputT, AccumT, OutputT>
-        implements CombiningState<InputT, AccumT, OutputT> {
+    implements CombiningState<InputT, AccumT, OutputT> {
+
+  @Nullable
+  private final BagState<AccumT> accumBagState;
+  private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
 
-    @Nullable
-    private final BagState<AccumT> accumBagState;
-    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-    JStormCombiningState(
-            BagState<AccumT> accumBagState,
-            Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-        this.accumBagState = checkNotNull(accumBagState, "accumBagState");
-        this.combineFn = checkNotNull(combineFn, "combineFn");
-    }
+  JStormCombiningState(
+      BagState<AccumT> accumBagState,
+      Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+    this.accumBagState = checkNotNull(accumBagState, "accumBagState");
+    this.combineFn = checkNotNull(combineFn, "combineFn");
+  }
 
-    @Override
-    public AccumT getAccum() {
-        // TODO: replacing the accumBagState with the merged accum.
-        return combineFn.mergeAccumulators(accumBagState.read());
-    }
+  @Override
+  public AccumT getAccum() {
+    // TODO: replacing the accumBagState with the merged accum.
+    return combineFn.mergeAccumulators(accumBagState.read());
+  }
 
-    @Override
-    public void addAccum(AccumT accumT) {
-        accumBagState.add(accumT);
-    }
+  @Override
+  public void addAccum(AccumT accumT) {
+    accumBagState.add(accumT);
+  }
 
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
-        return combineFn.mergeAccumulators(iterable);
-    }
+  @Override
+  public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
+    return combineFn.mergeAccumulators(iterable);
+  }
 
-    @Override
-    public void add(InputT input) {
-        accumBagState.add(
-                combineFn.addInput(combineFn.createAccumulator(), input));
-    }
+  @Override
+  public void add(InputT input) {
+    accumBagState.add(
+        combineFn.addInput(combineFn.createAccumulator(), input));
+  }
 
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-        return accumBagState.isEmpty();
-    }
+  @Override
+  public ReadableState<Boolean> isEmpty() {
+    return accumBagState.isEmpty();
+  }
 
-    @Override
-    public OutputT read() {
-        return combineFn.extractOutput(
-            combineFn.mergeAccumulators(accumBagState.read()));
-    }
+  @Override
+  public OutputT read() {
+    return combineFn.extractOutput(
+        combineFn.mergeAccumulators(accumBagState.read()));
+  }
 
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> readLater() {
-        // TODO: support prefetch.
-        return this;
-    }
+  @Override
+  public CombiningState<InputT, AccumT, OutputT> readLater() {
+    // TODO: support prefetch.
+    return this;
+  }
 
-    @Override
-    public void clear() {
-        accumBagState.clear();
-    }
+  @Override
+  public void clear() {
+    accumBagState.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
index f101beb..f1c1ed0 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,137 +18,136 @@
 package org.apache.beam.runners.jstorm.translation.runtime.state;
 
 import com.alibaba.jstorm.cache.IKvStore;
+import java.io.IOException;
+import java.util.Map;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.sdk.state.MapState;
 import org.apache.beam.sdk.state.ReadableState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Map;
-
 public class JStormMapState<K, V> implements MapState<K, V> {
-    private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
-
-    private final K key;
-    private final StateNamespace namespace;
-    private IKvStore<K, V> kvStore;
-
-    public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
-        this.key = key;
-        this.namespace = namespace;
-        this.kvStore = kvStore;
+  private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
+
+  private final K key;
+  private final StateNamespace namespace;
+  private IKvStore<K, V> kvStore;
+
+  public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
+    this.key = key;
+    this.namespace = namespace;
+    this.kvStore = kvStore;
+  }
+
+  @Override
+  public void put(K var1, V var2) {
+    try {
+      kvStore.put(var1, var2);
+    } catch (IOException e) {
+      reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
     }
-
-    @Override
-    public void put(K var1, V var2) {
-        try {
-            kvStore.put(var1, var2);
-        } catch (IOException e) {
-            reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
-        }
+  }
+
+  @Override
+  public ReadableState<V> putIfAbsent(K var1, V var2) {
+    ReadableState<V> ret = null;
+    try {
+      V value = kvStore.get(var1);
+      if (value == null) {
+        kvStore.put(var1, var2);
+        ret = new MapReadableState<>(null);
+      } else {
+        ret = new MapReadableState<>(value);
+      }
+    } catch (IOException e) {
+      reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
     }
-
-    @Override
-    public ReadableState<V> putIfAbsent(K var1, V var2) {
-        ReadableState<V> ret = null;
-        try {
-            V value = kvStore.get(var1);
-            if (value == null) {
-                kvStore.put(var1, var2);
-                ret = new MapReadableState<>(null);
-            } else {
-                ret = new MapReadableState<>(value);
-            }
-        } catch (IOException e) {
-            reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
-        }
-        return ret;
+    return ret;
+  }
+
+  @Override
+  public void remove(K var1) {
+    try {
+      kvStore.remove(var1);
+    } catch (IOException e) {
+      reportError(String.format("Failed to remove key=%s", var1), e);
     }
-
-    @Override
-    public void remove(K var1) {
-        try {
-            kvStore.remove(var1);
-        } catch (IOException e) {
-            reportError(String.format("Failed to remove key=%s", var1), e);
-        }
+  }
+
+  @Override
+  public ReadableState<V> get(K var1) {
+    ReadableState<V> ret = new MapReadableState<>(null);
+    try {
+      ret = new MapReadableState(kvStore.get(var1));
+    } catch (IOException e) {
+      reportError(String.format("Failed to get value for key=%s", var1), e);
     }
-
-    @Override
-    public ReadableState<V> get(K var1) {
-        ReadableState<V> ret = new MapReadableState<>(null);
-        try {
-            ret = new MapReadableState(kvStore.get(var1));
-        } catch (IOException e) {
-            reportError(String.format("Failed to get value for key=%s", var1), e);
-        }
-        return ret;
+    return ret;
+  }
+
+  @Override
+  public ReadableState<Iterable<K>> keys() {
+    ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
+    try {
+      ret = new MapReadableState<>(kvStore.keys());
+    } catch (IOException e) {
+      reportError(String.format("Failed to get keys"), e);
     }
-
-    @Override
-    public ReadableState<Iterable<K>> keys() {
-        ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
-        try {
-            ret = new MapReadableState<>(kvStore.keys());
-        } catch (IOException e) {
-            reportError(String.format("Failed to get keys"), e);
-        }
-        return ret;
+    return ret;
+  }
+
+  @Override
+  public ReadableState<Iterable<V>> values() {
+    ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
+    try {
+      ret = new MapReadableState<>(kvStore.values());
+    } catch (IOException e) {
+      reportError(String.format("Failed to get values"), e);
     }
-
-    @Override
-    public ReadableState<Iterable<V>> values() {
-        ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
-        try {
-            ret = new MapReadableState<>(kvStore.values());
-        } catch (IOException e) {
-            reportError(String.format("Failed to get values"), e);
-        }
-        return ret;
-    }
-
-    @Override
-    public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
-        ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
-        try {
-            ret = new MapReadableState<>(kvStore.entries());
-        } catch (IOException e) {
-            reportError(String.format("Failed to get values"), e);
-        }
-        return ret;
+    return ret;
+  }
+
+  @Override
+  public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
+    ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
+    try {
+      ret = new MapReadableState<>(kvStore.entries());
+    } catch (IOException e) {
+      reportError(String.format("Failed to get values"), e);
     }
-
-    @Override
-    public void clear() {
-        try {
-            Iterable<K> keys = kvStore.keys();
-            kvStore.removeBatch(keys);
-        } catch (IOException e) {
-            reportError(String.format("Failed to clear map state"), e);
-        }
+    return ret;
+  }
+
+  @Override
+  public void clear() {
+    try {
+      Iterable<K> keys = kvStore.keys();
+      kvStore.removeBatch(keys);
+    } catch (IOException e) {
+      reportError(String.format("Failed to clear map state"), e);
     }
+  }
 
-    private void reportError(String errorInfo, IOException e) {
-        LOG.error(errorInfo, e);
-        throw new RuntimeException(errorInfo);
-    }
+  private void reportError(String errorInfo, IOException e) {
+    LOG.error(errorInfo, e);
+    throw new RuntimeException(errorInfo);
+  }
 
-    private class MapReadableState<T> implements ReadableState<T> {
-        private T value;
+  private class MapReadableState<T> implements ReadableState<T> {
+    private T value;
 
-        public MapReadableState(T value) {
-            this.value = value;
-        }
+    public MapReadableState(T value) {
+      this.value = value;
+    }
 
-        @Override
-        public T read() {
-            return value;
-        }
+    @Override
+    public T read() {
+      return value;
+    }
 
-        @Override
-        public ReadableState<T> readLater() {
-            return this;
-        }
+    @Override
+    public ReadableState<T> readLater() {
+      return this;
     }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
index 8a0cb73..80ef3a2 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,13 +17,16 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime.state;
 
-import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.alibaba.jstorm.cache.ComposedKey;
 import com.alibaba.jstorm.cache.IKvStoreManager;
-
+import java.io.IOException;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
@@ -41,151 +44,148 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.joda.time.Instant;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * JStorm implementation of {@link StateInternals}.
  */
 public class JStormStateInternals<K> implements StateInternals {
 
-    private static final String STATE_INFO = "state-info:";
-
-    @Nullable
-    private final K key;
-    private final IKvStoreManager kvStoreManager;
-    private final TimerService timerService;
-    private final int executorId;
-
-    public JStormStateInternals(K key, IKvStoreManager kvStoreManager,
-                                TimerService timerService, int executorId) {
-        this.key = key;
-        this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager");
-        this.timerService = checkNotNull(timerService, "timerService");
-        this.executorId = executorId;
-    }
-
-    @Nullable
-    @Override
-    public K getKey() {
-        return key;
-    }
-
-    @Override
-    public <T extends State> T state(
-        StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
-        // throw new UnsupportedOperationException("StateContext is not supported.");
-        /**
-         * TODO:
-         * Same implementation as state() which is without StateContext. This might be updated after
-         * we figure out if we really need StateContext for JStorm state internals.
-         */
-        return state(namespace, address);
-    }
-
-    @Override
-    public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
-        return address.getSpec().bind(address.getId(), new StateBinder() {
-            @Override
-            public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
-                try {
-                    return new JStormValueState<>(
-                            getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
-                } catch (IOException e) {
-                    throw new RuntimeException();
-                }
-            }
-
-            @Override
-            public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
-                try {
-                    return new JStormBagState(
-                            getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
-                            kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
-                } catch (IOException e) {
-                    throw new RuntimeException();
+  private static final String STATE_INFO = "state-info:";
+
+  @Nullable
+  private final K key;
+  private final IKvStoreManager kvStoreManager;
+  private final TimerService timerService;
+  private final int executorId;
+
+  public JStormStateInternals(K key, IKvStoreManager kvStoreManager,
+                              TimerService timerService, int executorId) {
+    this.key = key;
+    this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager");
+    this.timerService = checkNotNull(timerService, "timerService");
+    this.executorId = executorId;
+  }
+
+  @Nullable
+  @Override
+  public K getKey() {
+    return key;
+  }
+
+  @Override
+  public <T extends State> T state(
+      StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
+    // throw new UnsupportedOperationException("StateContext is not supported.");
+    /**
+     * TODO:
+     * Same implementation as state() which is without StateContext. This might be updated after
+     * we figure out if we really need StateContext for JStorm state internals.
+     */
+    return state(namespace, address);
+  }
+
+  @Override
+  public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
+    return address.getSpec().bind(address.getId(), new StateBinder() {
+      @Override
+      public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
+        try {
+          return new JStormValueState<>(
+              getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
+        } catch (IOException e) {
+          throw new RuntimeException();
+        }
+      }
+
+      @Override
+      public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
+        try {
+          return new JStormBagState(
+              getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
+              kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+        } catch (IOException e) {
+          throw new RuntimeException();
+        }
+      }
+
+      @Override
+      public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+          String id,
+          StateSpec<MapState<KeyT, ValueT>> spec,
+          Coder<KeyT> mapKeyCoder,
+          Coder<ValueT> mapValueCoder) {
+        try {
+          return new JStormMapState<>(
+              getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT> CombiningState bindCombining(
+          String id,
+          StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+          Coder<AccumT> accumCoder,
+          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+        try {
+          BagState<AccumT> accumBagState = new JStormBagState(
+              getKey(), namespace,
+              kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
+              kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+          return new JStormCombiningState<>(accumBagState, combineFn);
+        } catch (IOException e) {
+          throw new RuntimeException();
+        }
+      }
+
+
+      @Override
+      public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
+      bindCombiningWithContext(
+          String id,
+          StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder,
+          CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public WatermarkHoldState bindWatermark(
+          String id,
+          StateSpec<WatermarkHoldState> spec,
+          final TimestampCombiner timestampCombiner) {
+        try {
+          BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
+              getKey(), namespace,
+              kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
+              kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+
+          Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
+              new BinaryCombineFn<Instant>() {
+                @Override
+                public Instant apply(Instant left, Instant right) {
+                  return timestampCombiner.combine(left, right);
                 }
-            }
-
-            @Override
-            public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-                String id,
-                StateSpec<MapState<KeyT, ValueT>> spec,
-                Coder<KeyT> mapKeyCoder,
-                Coder<ValueT> mapValueCoder) {
-                try {
-                    return new JStormMapState<>(getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-
-            @Override
-            public <InputT, AccumT, OutputT> CombiningState bindCombining(
-                    String id,
-                    StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
-                    Coder<AccumT> accumCoder,
-                    Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-                try {
-                    BagState<AccumT> accumBagState = new JStormBagState(
-                            getKey(), namespace,
-                            kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
-                            kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
-                    return new JStormCombiningState<>(accumBagState, combineFn);
-                } catch (IOException e) {
-                    throw new RuntimeException();
-                }
-            }
-
-
-            @Override
-            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
-            bindCombiningWithContext(
-                String id,
-                StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder,
-                CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public WatermarkHoldState bindWatermark(
-                String id,
-                StateSpec<WatermarkHoldState> spec,
-                final TimestampCombiner timestampCombiner) {
-                try {
-                    BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
-                            getKey(), namespace,
-                            kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
-                            kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
-
-                    Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
-                            new BinaryCombineFn<Instant>() {
-                                @Override
-                                public Instant apply(Instant left, Instant right) {
-                                  return timestampCombiner.combine(left, right);
-                                }};
-                    return new JStormWatermarkHoldState(
-                            namespace,
-                            new JStormCombiningState<>(
-                                    accumBagState,
-                                    outputTimeCombineFn),
-                            timestampCombiner,
-                            timerService);
-                } catch (IOException e) {
-                    throw new RuntimeException();
-                }
-            }
-        });
-    }
-
-    private String getStoreId(String stateId) {
-        return String.format("%s-%s", stateId, executorId);
-    }
+              };
+          return new JStormWatermarkHoldState(
+              namespace,
+              new JStormCombiningState<>(
+                  accumBagState,
+                  outputTimeCombineFn),
+              timestampCombiner,
+              timerService);
+        } catch (IOException e) {
+          throw new RuntimeException();
+        }
+      }
+    });
+  }
+
+  private String getStoreId(String stateId) {
+    return String.format("%s-%s", stateId, executorId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
index 5ad3663..79ff6b4 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,66 +19,64 @@ package org.apache.beam.runners.jstorm.translation.runtime.state;
 
 import com.alibaba.jstorm.cache.ComposedKey;
 import com.alibaba.jstorm.cache.IKvStore;
-
+import java.io.IOException;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.sdk.state.ValueState;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-
 /**
  * JStorm implementation of {@link ValueState}.
  */
 public class JStormValueState<K, T> implements ValueState<T> {
 
-    @Nullable
-    private final K key;
-    private final StateNamespace namespace;
-    private final IKvStore<ComposedKey, T> kvState;
+  @Nullable
+  private final K key;
+  private final StateNamespace namespace;
+  private final IKvStore<ComposedKey, T> kvState;
 
-    JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
-        this.key = key;
-        this.namespace = namespace;
-        this.kvState = kvState;
-    }
+  JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
+    this.key = key;
+    this.namespace = namespace;
+    this.kvState = kvState;
+  }
 
-    @Override
-    public void write(T t) {
-        try {
-            kvState.put(getComposedKey(), t);
-        } catch (IOException e) {
-            throw new RuntimeException(String.format(
-                    "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
-        }
+  @Override
+  public void write(T t) {
+    try {
+      kvState.put(getComposedKey(), t);
+    } catch (IOException e) {
+      throw new RuntimeException(String.format(
+          "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
     }
+  }
 
-    @Override
-    public T read() {
-        try {
-            return kvState.get(getComposedKey());
-        } catch (IOException e) {
-            throw new RuntimeException(String.format(
-                    "Failed to read key: %s, namespace: %s.", key, namespace));
-        }
+  @Override
+  public T read() {
+    try {
+      return kvState.get(getComposedKey());
+    } catch (IOException e) {
+      throw new RuntimeException(String.format(
+          "Failed to read key: %s, namespace: %s.", key, namespace));
     }
+  }
 
-    @Override
-    public ValueState<T> readLater() {
-        // TODO: support prefetch.
-        return this;
-    }
+  @Override
+  public ValueState<T> readLater() {
+    // TODO: support prefetch.
+    return this;
+  }
 
-    @Override
-    public void clear() {
-        try {
-            kvState.remove(getComposedKey());
-        } catch (IOException e) {
-            throw new RuntimeException(String.format(
-                    "Failed to clear key: %s, namespace: %s.", key, namespace));
-        }
+  @Override
+  public void clear() {
+    try {
+      kvState.remove(getComposedKey());
+    } catch (IOException e) {
+      throw new RuntimeException(String.format(
+          "Failed to clear key: %s, namespace: %s.", key, namespace));
     }
+  }
 
-    private ComposedKey getComposedKey() {
-        return ComposedKey.of(key, namespace);
-    }
+  private ComposedKey getComposedKey() {
+    return ComposedKey.of(key, namespace);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
index 659d77c..dc3ba43 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,8 +19,8 @@ package org.apache.beam.runners.jstorm.translation.runtime.state;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
 import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
 import org.apache.beam.sdk.state.GroupingState;
 import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.state.WatermarkHoldState;
@@ -32,52 +32,52 @@ import org.joda.time.Instant;
  */
 public class JStormWatermarkHoldState implements WatermarkHoldState {
 
-    private final StateNamespace namespace;
-    private final GroupingState<Instant, Instant> watermarkHoldsState;
-    private final TimestampCombiner timestampCombiner;
-    private final TimerService timerService;
+  private final StateNamespace namespace;
+  private final GroupingState<Instant, Instant> watermarkHoldsState;
+  private final TimestampCombiner timestampCombiner;
+  private final TimerService timerService;
 
-    JStormWatermarkHoldState(
-            StateNamespace namespace,
-            GroupingState<Instant, Instant> watermarkHoldsState,
-            TimestampCombiner timestampCombiner,
-            TimerService timerService) {
-        this.namespace = checkNotNull(namespace, "namespace");
-        this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
-        this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
-        this.timerService = checkNotNull(timerService, "timerService");
-    }
+  JStormWatermarkHoldState(
+      StateNamespace namespace,
+      GroupingState<Instant, Instant> watermarkHoldsState,
+      TimestampCombiner timestampCombiner,
+      TimerService timerService) {
+    this.namespace = checkNotNull(namespace, "namespace");
+    this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
+    this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
+    this.timerService = checkNotNull(timerService, "timerService");
+  }
 
-    @Override
-    public TimestampCombiner getTimestampCombiner() {
-        return timestampCombiner;
-    }
+  @Override
+  public TimestampCombiner getTimestampCombiner() {
+    return timestampCombiner;
+  }
 
-    @Override
-    public void add(Instant instant) {
-        timerService.addWatermarkHold(namespace.stringKey(), instant);
-        watermarkHoldsState.add(instant);
-    }
+  @Override
+  public void add(Instant instant) {
+    timerService.addWatermarkHold(namespace.stringKey(), instant);
+    watermarkHoldsState.add(instant);
+  }
 
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-        return watermarkHoldsState.isEmpty();
-    }
+  @Override
+  public ReadableState<Boolean> isEmpty() {
+    return watermarkHoldsState.isEmpty();
+  }
 
-    @Override
-    public Instant read() {
-        return watermarkHoldsState.read();
-    }
+  @Override
+  public Instant read() {
+    return watermarkHoldsState.read();
+  }
 
-    @Override
-    public WatermarkHoldState readLater() {
-        // TODO: support prefetch.
-        return this;
-    }
+  @Override
+  public WatermarkHoldState readLater() {
+    // TODO: support prefetch.
+    return this;
+  }
 
-    @Override
-    public void clear() {
-        timerService.clearWatermarkHold(namespace.stringKey());
-        watermarkHoldsState.clear();
-    }
+  @Override
+  public void clear() {
+    timerService.clearWatermarkHold(namespace.stringKey());
+    watermarkHoldsState.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
index 4b5f83c..184a957 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,83 +17,84 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime.timer;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
 import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.joda.time.Instant;
 
-import javax.annotation.Nullable;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * JStorm implementation of {@link TimerInternals}.
  */
 public class JStormTimerInternals<K> implements TimerInternals {
 
-    private final K key;
-    private final DoFnExecutor<?, ?> doFnExecutor;
-    private final TimerService timerService;
-
-
-    public JStormTimerInternals(@Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) {
-        this.key = key;
-        this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor");
-        this.timerService = checkNotNull(timerService, "timerService");
-    }
-
-    @Override
-    public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
-        setTimer(TimerData.of(timerId, namespace, target, timeDomain));
-    }
-
-    @Override
-    @Deprecated
-    public void setTimer(TimerData timerData) {
-        timerService.setTimer(key, timerData, doFnExecutor);
-    }
-
-    @Override
-    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
-        throw new UnsupportedOperationException(
-                "Canceling of a timer is not yet supported.");
-    }
-
-    @Override
-    @Deprecated
-    public void deleteTimer(StateNamespace namespace, String timerId) {
-        throw new UnsupportedOperationException(
-                "Canceling of a timer is not yet supported.");
-    }
-
-    @Override
-    @Deprecated
-    public void deleteTimer(TimerData timerData) {
-        throw new UnsupportedOperationException(
-                "Canceling of a timer is not yet supported.");
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-        return Instant.now();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-        return null;
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-        return new Instant(timerService.currentInputWatermark());
-    }
-
-    @Override
-    @Nullable
-    public Instant currentOutputWatermarkTime() {
-        return new Instant(timerService.currentOutputWatermark());
-    }
+  private final K key;
+  private final DoFnExecutor<?, ?> doFnExecutor;
+  private final TimerService timerService;
+
+
+  public JStormTimerInternals(
+      @Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) {
+    this.key = key;
+    this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor");
+    this.timerService = checkNotNull(timerService, "timerService");
+  }
+
+  @Override
+  public void setTimer(
+      StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
+    setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+  }
+
+  @Override
+  @Deprecated
+  public void setTimer(TimerData timerData) {
+    timerService.setTimer(key, timerData, doFnExecutor);
+  }
+
+  @Override
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+    throw new UnsupportedOperationException(
+        "Canceling of a timer is not yet supported.");
+  }
+
+  @Override
+  @Deprecated
+  public void deleteTimer(StateNamespace namespace, String timerId) {
+    throw new UnsupportedOperationException(
+        "Canceling of a timer is not yet supported.");
+  }
+
+  @Override
+  @Deprecated
+  public void deleteTimer(TimerData timerData) {
+    throw new UnsupportedOperationException(
+        "Canceling of a timer is not yet supported.");
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return Instant.now();
+  }
+
+  @Override
+  @Nullable
+  public Instant currentSynchronizedProcessingTime() {
+    return null;
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return new Instant(timerService.currentInputWatermark());
+  }
+
+  @Override
+  @Nullable
+  public Instant currentOutputWatermarkTime() {
+    return new Instant(timerService.currentOutputWatermark());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
index 9651fc2..7e7a54a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
@@ -17,10 +17,9 @@
  */
 package org.apache.beam.runners.jstorm.translation.translator;
 
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
 import org.apache.beam.runners.jstorm.translation.TranslationContext;
 import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
-
-import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
@@ -33,18 +32,20 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 public class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
 
-    @Override
-    public void translateNode(Read.Bounded<T> transform, TranslationContext context) {
-        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+  @Override
+  public void translateNode(Read.Bounded<T> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    String description =
+        describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
 
-        TupleTag<?> outputTag = userGraphContext.getOutputTag();
-        PValue outputValue = userGraphContext.getOutput();
-        UnboundedSourceSpout spout = new UnboundedSourceSpout(
-                description,
-                new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
-                userGraphContext.getOptions(), outputTag);
+    TupleTag<?> outputTag = userGraphContext.getOutputTag();
+    PValue outputValue = userGraphContext.getOutput();
+    UnboundedSourceSpout spout = new UnboundedSourceSpout(
+        description,
+        new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
+        userGraphContext.getOptions(), outputTag);
 
-        context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(outputTag, outputValue));
-    }
+    context.getExecutionGraphContext().registerSpout(
+        spout, TaggedPValue.of(outputTag, outputValue));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
index c4da58a..fe5fca9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.jstorm.translation.translator;
 
 import org.apache.beam.sdk.transforms.Combine;
 
-public class CombineGloballyTranslator<InputT, OutputT> extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> {
-    
+public class CombineGloballyTranslator<InputT, OutputT>
+    extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> {
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
index 99cbff7..c382fb7 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.jstorm.translation.translator;
 
 import org.apache.beam.sdk.transforms.Combine;
 
-public class CombinePerKeyTranslator<K, InputT, OutputT> extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> {
-    
+public class CombinePerKeyTranslator<K, InputT, OutputT>
+    extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> {
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
index 4558216..bf8d472 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
@@ -18,32 +18,30 @@
 package org.apache.beam.runners.jstorm.translation.translator;
 
 import com.google.common.collect.Maps;
-import org.apache.beam.sdk.transforms.Flatten;
-
+import java.util.Map;
 import org.apache.beam.runners.jstorm.translation.TranslationContext;
 import org.apache.beam.runners.jstorm.translation.runtime.FlattenExecutor;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
-import java.util.Map;
-
 public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> {
 
-    @Override
-    public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
-        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+  @Override
+  public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
 
-        // Since a new tag is created in PCollectionList, retrieve the real tag here.
-        Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
-        for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
-            PCollection<V> pc = (PCollection<V>) entry.getValue();
-            inputs.putAll(pc.expand());
-        }
-        System.out.println("Real inputs: " + inputs);
-        System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
-        String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
-        FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
-        context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
+    // Since a new tag is created in PCollectionList, retrieve the real tag here.
+    Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
+    for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
+      PCollection<V> pc = (PCollection<V>) entry.getValue();
+      inputs.putAll(pc.expand());
     }
+    System.out.println("Real inputs: " + inputs);
+    System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
+    String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
+    FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
+    context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
index 6b8297b..85f96ce 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
@@ -17,53 +17,52 @@
  */
 package org.apache.beam.runners.jstorm.translation.translator;
 
-import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
 import com.google.common.collect.Lists;
-import org.apache.beam.sdk.transforms.GroupByKey;
-
+import java.util.Collections;
+import java.util.List;
 import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-
-import java.util.Collections;
-import java.util.List;
+import org.apache.beam.sdk.values.WindowingStrategy;
 
 public class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
-    // information of transform
-    protected PCollection<KV<K, V>> input;
-    protected PCollection<KV<K, Iterable<V>>> output;
-    protected List<TupleTag<?>> inputTags;
-    protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
-    protected List<TupleTag<?>> sideOutputTags;
-    protected List<PCollectionView<?>> sideInputs;
-    protected WindowingStrategy<?, ?> windowingStrategy;
+  // information of transform
+  protected PCollection<KV<K, V>> input;
+  protected PCollection<KV<K, Iterable<V>>> output;
+  protected List<TupleTag<?>> inputTags;
+  protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
+  protected List<TupleTag<?>> sideOutputTags;
+  protected List<PCollectionView<?>> sideInputs;
+  protected WindowingStrategy<?, ?> windowingStrategy;
 
-    @Override
-    public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
-        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+  @Override
+  public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    String description =
+        describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
 
-        input = (PCollection<KV<K, V>>) userGraphContext.getInput();
-        output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
+    input = (PCollection<KV<K, V>>) userGraphContext.getInput();
+    output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
 
-        inputTags = userGraphContext.getInputTags();
-        mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
-        sideOutputTags = Lists.newArrayList();
+    inputTags = userGraphContext.getInputTags();
+    mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
+    sideOutputTags = Lists.newArrayList();
 
-        sideInputs = Collections.<PCollectionView<?>>emptyList();
-        windowingStrategy = input.getWindowingStrategy();
+    sideInputs = Collections.<PCollectionView<?>>emptyList();
+    windowingStrategy = input.getWindowingStrategy();
 
-        GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(
-                userGraphContext.getStepName(),
-                description,
-                context,
-                context.getUserGraphContext().getOptions(),
-                windowingStrategy,
-                mainOutputTag,
-                sideOutputTags);
-        context.addTransformExecutor(groupByWindowExecutor);
-    }
+    GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(
+        userGraphContext.getStepName(),
+        description,
+        context,
+        context.getUserGraphContext().getOptions(),
+        windowingStrategy,
+        mainOutputTag,
+        sideOutputTags);
+    context.addTransformExecutor(groupByWindowExecutor);
+  }
 }