You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:27 UTC

[27/53] [abbrv] beam git commit: jstorm-runner: move jstorm state implementations to JStormStateInternals inner classes.

jstorm-runner: move jstorm state implementations to JStormStateInternals inner classes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9abbbd06
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9abbbd06
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9abbbd06

Branch: refs/heads/jstorm-runner
Commit: 9abbbd064e878a961ff3e8fc62d96ea650fd7570
Parents: 8cdd41b
Author: Pei He <pe...@apache.org>
Authored: Fri Jul 14 16:10:29 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:57 2017 +0800

----------------------------------------------------------------------
 .../jstorm/translation/JStormBagState.java      | 180 -------
 .../translation/JStormCombiningState.java       |  88 ----
 .../jstorm/translation/JStormMapState.java      | 158 -------
 .../translation/JStormStateInternals.java       | 464 +++++++++++++++++++
 .../jstorm/translation/JStormValueState.java    |  82 ----
 .../translation/JStormWatermarkHoldState.java   |  82 ----
 6 files changed, 464 insertions(+), 590 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java
deleted file mode 100644
index 3e5d52b..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStore;
-import com.alibaba.jstorm.cache.KvStoreIterable;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of {@link BagState} in JStorm runner.
- */
-class JStormBagState<K, T> implements BagState<T> {
-  private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
-
-  @Nullable
-  private final K key;
-  private final StateNamespace namespace;
-  private final IKvStore<ComposedKey, T> kvState;
-  private final IKvStore<ComposedKey, Object> stateInfoKvState;
-  private int elemIndex;
-
-  public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
-                        IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
-    this.key = key;
-    this.namespace = checkNotNull(namespace, "namespace");
-    this.kvState = checkNotNull(kvState, "kvState");
-    this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
-
-    Integer index = (Integer) stateInfoKvState.get(getComposedKey());
-    this.elemIndex = index != null ? ++index : 0;
-  }
-
-  @Override
-  public void add(T input) {
-    try {
-      kvState.put(getComposedKey(elemIndex), input);
-      stateInfoKvState.put(getComposedKey(), elemIndex);
-      elemIndex++;
-    } catch (IOException e) {
-      throw new RuntimeException(e.getCause());
-    }
-  }
-
-  @Override
-  public ReadableState<Boolean> isEmpty() {
-    return new ReadableState<Boolean>() {
-      @Override
-      public Boolean read() {
-        return elemIndex <= 0;
-      }
-
-      @Override
-      public ReadableState<Boolean> readLater() {
-        // TODO: support prefetch.
-        return this;
-      }
-    };
-  }
-
-  @Override
-  public Iterable<T> read() {
-    return new BagStateIterable(elemIndex);
-  }
-
-  @Override
-  public BagState readLater() {
-    // TODO: support prefetch.
-    return this;
-  }
-
-  @Override
-  public void clear() {
-    try {
-      for (int i = 0; i < elemIndex; i++) {
-        kvState.remove(getComposedKey(i));
-      }
-      stateInfoKvState.remove(getComposedKey());
-      elemIndex = 0;
-    } catch (IOException e) {
-      throw new RuntimeException(e.getCause());
-    }
-  }
-
-  private ComposedKey getComposedKey() {
-    return ComposedKey.of(key, namespace);
-  }
-
-  private ComposedKey getComposedKey(int elemIndex) {
-    return ComposedKey.of(key, namespace, elemIndex);
-  }
-
-  /**
-   * Implementation of Bag state Iterable.
-   */
-  private class BagStateIterable implements KvStoreIterable<T> {
-
-    private class BagStateIterator implements Iterator<T> {
-      private final int size;
-      private int cursor = 0;
-
-      BagStateIterator() {
-        Integer s = null;
-        try {
-          s = (Integer) stateInfoKvState.get(getComposedKey());
-        } catch (IOException e) {
-          LOG.error("Failed to get elemIndex for key={}", getComposedKey());
-        }
-        this.size = s != null ? ++s : 0;
-      }
-
-      @Override
-      public boolean hasNext() {
-        return cursor < size;
-      }
-
-      @Override
-      public T next() {
-        if (cursor >= size) {
-          throw new NoSuchElementException();
-        }
-
-        T value = null;
-        try {
-          value = kvState.get(getComposedKey(cursor));
-        } catch (IOException e) {
-          LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
-        }
-        cursor++;
-        return value;
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException();
-      }
-    }
-
-    private final int size;
-
-    BagStateIterable(int size) {
-      this.size = size;
-    }
-
-    @Override
-    public Iterator<T> iterator() {
-      return new BagStateIterator();
-    }
-
-    @Override
-    public String toString() {
-      return String.format("BagStateIterable: composedKey=%s", getComposedKey());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java
deleted file mode 100644
index 6bd021f..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.transforms.Combine;
-
-/**
- * JStorm implementation of {@link CombiningState}.
- */
-class JStormCombiningState<InputT, AccumT, OutputT>
-    implements CombiningState<InputT, AccumT, OutputT> {
-
-  @Nullable
-  private final BagState<AccumT> accumBagState;
-  private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-
-  JStormCombiningState(
-      BagState<AccumT> accumBagState,
-      Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-    this.accumBagState = checkNotNull(accumBagState, "accumBagState");
-    this.combineFn = checkNotNull(combineFn, "combineFn");
-  }
-
-  @Override
-  public AccumT getAccum() {
-    // TODO: replacing the accumBagState with the merged accum.
-    return combineFn.mergeAccumulators(accumBagState.read());
-  }
-
-  @Override
-  public void addAccum(AccumT accumT) {
-    accumBagState.add(accumT);
-  }
-
-  @Override
-  public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
-    return combineFn.mergeAccumulators(iterable);
-  }
-
-  @Override
-  public void add(InputT input) {
-    accumBagState.add(
-        combineFn.addInput(combineFn.createAccumulator(), input));
-  }
-
-  @Override
-  public ReadableState<Boolean> isEmpty() {
-    return accumBagState.isEmpty();
-  }
-
-  @Override
-  public OutputT read() {
-    return combineFn.extractOutput(
-        combineFn.mergeAccumulators(accumBagState.read()));
-  }
-
-  @Override
-  public CombiningState<InputT, AccumT, OutputT> readLater() {
-    // TODO: support prefetch.
-    return this;
-  }
-
-  @Override
-  public void clear() {
-    accumBagState.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java
deleted file mode 100644
index 6a4e376..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import com.alibaba.jstorm.cache.IKvStore;
-import java.io.IOException;
-import java.util.Map;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of {@link MapState} in JStorm runner.
- * @param <K>
- * @param <V>
- */
-class JStormMapState<K, V> implements MapState<K, V> {
-  private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
-
-  private final K key;
-  private final StateNamespace namespace;
-  private IKvStore<K, V> kvStore;
-
-  public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
-    this.key = key;
-    this.namespace = namespace;
-    this.kvStore = kvStore;
-  }
-
-  @Override
-  public void put(K var1, V var2) {
-    try {
-      kvStore.put(var1, var2);
-    } catch (IOException e) {
-      reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
-    }
-  }
-
-  @Override
-  public ReadableState<V> putIfAbsent(K var1, V var2) {
-    ReadableState<V> ret = null;
-    try {
-      V value = kvStore.get(var1);
-      if (value == null) {
-        kvStore.put(var1, var2);
-        ret = new MapReadableState<>(null);
-      } else {
-        ret = new MapReadableState<>(value);
-      }
-    } catch (IOException e) {
-      reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
-    }
-    return ret;
-  }
-
-  @Override
-  public void remove(K var1) {
-    try {
-      kvStore.remove(var1);
-    } catch (IOException e) {
-      reportError(String.format("Failed to remove key=%s", var1), e);
-    }
-  }
-
-  @Override
-  public ReadableState<V> get(K var1) {
-    ReadableState<V> ret = new MapReadableState<>(null);
-    try {
-      ret = new MapReadableState(kvStore.get(var1));
-    } catch (IOException e) {
-      reportError(String.format("Failed to get value for key=%s", var1), e);
-    }
-    return ret;
-  }
-
-  @Override
-  public ReadableState<Iterable<K>> keys() {
-    ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
-    try {
-      ret = new MapReadableState<>(kvStore.keys());
-    } catch (IOException e) {
-      reportError(String.format("Failed to get keys"), e);
-    }
-    return ret;
-  }
-
-  @Override
-  public ReadableState<Iterable<V>> values() {
-    ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
-    try {
-      ret = new MapReadableState<>(kvStore.values());
-    } catch (IOException e) {
-      reportError(String.format("Failed to get values"), e);
-    }
-    return ret;
-  }
-
-  @Override
-  public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
-    ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
-    try {
-      ret = new MapReadableState<>(kvStore.entries());
-    } catch (IOException e) {
-      reportError(String.format("Failed to get values"), e);
-    }
-    return ret;
-  }
-
-  @Override
-  public void clear() {
-    try {
-      Iterable<K> keys = kvStore.keys();
-      kvStore.removeBatch(keys);
-    } catch (IOException e) {
-      reportError(String.format("Failed to clear map state"), e);
-    }
-  }
-
-  private void reportError(String errorInfo, IOException e) {
-    LOG.error(errorInfo, e);
-    throw new RuntimeException(errorInfo);
-  }
-
-  private class MapReadableState<T> implements ReadableState<T> {
-    private T value;
-
-    public MapReadableState(T value) {
-      this.value = value;
-    }
-
-    @Override
-    public T read() {
-      return value;
-    }
-
-    @Override
-    public ReadableState<T> readLater() {
-      return this;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
index 78882f2..3b6b4d5 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
@@ -20,8 +20,13 @@ package org.apache.beam.runners.jstorm.translation;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStore;
 import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.KvStoreIterable;
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
@@ -29,7 +34,9 @@ import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.GroupingState;
 import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateBinder;
@@ -42,12 +49,16 @@ import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * JStorm implementation of {@link StateInternals}.
  */
 class JStormStateInternals<K> implements StateInternals {
 
+  private static final Logger LOG = LoggerFactory.getLogger(JStormStateInternals.class);
+
   private static final String STATE_INFO = "state-info:";
 
   @Nullable
@@ -183,6 +194,459 @@ class JStormStateInternals<K> implements StateInternals {
     });
   }
 
+  /**
+   * JStorm implementation of {@link ValueState}.
+   */
+  private static class JStormValueState<K, T> implements ValueState<T> {
+
+    @Nullable
+    private final K key;
+    private final StateNamespace namespace;
+    private final IKvStore<ComposedKey, T> kvState;
+
+    JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
+      this.key = key;
+      this.namespace = namespace;
+      this.kvState = kvState;
+    }
+
+    @Override
+    public void write(T t) {
+      try {
+        kvState.put(getComposedKey(), t);
+      } catch (IOException e) {
+        throw new RuntimeException(String.format(
+            "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
+      }
+    }
+
+    @Override
+    public T read() {
+      try {
+        return kvState.get(getComposedKey());
+      } catch (IOException e) {
+        throw new RuntimeException(String.format(
+            "Failed to read key: %s, namespace: %s.", key, namespace));
+      }
+    }
+
+    @Override
+    public ValueState<T> readLater() {
+      // TODO: support prefetch.
+      return this;
+    }
+
+    @Override
+    public void clear() {
+      try {
+        kvState.remove(getComposedKey());
+      } catch (IOException e) {
+        throw new RuntimeException(String.format(
+            "Failed to clear key: %s, namespace: %s.", key, namespace));
+      }
+    }
+
+    private ComposedKey getComposedKey() {
+      return ComposedKey.of(key, namespace);
+    }
+  }
+
+  /**
+   * Implementation of {@link BagState} in JStorm runner.
+   */
+  private static class JStormBagState<K, T> implements BagState<T> {
+
+    @Nullable
+    private final K key;
+    private final StateNamespace namespace;
+    private final IKvStore<ComposedKey, T> kvState;
+    private final IKvStore<ComposedKey, Object> stateInfoKvState;
+    private int elemIndex;
+
+    JStormBagState(
+        @Nullable K key,
+        StateNamespace namespace,
+        IKvStore<ComposedKey, T> kvState,
+        IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
+      this.key = key;
+      this.namespace = checkNotNull(namespace, "namespace");
+      this.kvState = checkNotNull(kvState, "kvState");
+      this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
+
+      Integer index = (Integer) stateInfoKvState.get(getComposedKey());
+      this.elemIndex = index != null ? ++index : 0;
+    }
+
+    @Override
+    public void add(T input) {
+      try {
+        kvState.put(getComposedKey(elemIndex), input);
+        stateInfoKvState.put(getComposedKey(), elemIndex);
+        elemIndex++;
+      } catch (IOException e) {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          return elemIndex <= 0;
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          // TODO: support prefetch.
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public Iterable<T> read() {
+      return new BagStateIterable(elemIndex);
+    }
+
+    @Override
+    public BagState readLater() {
+      // TODO: support prefetch.
+      return this;
+    }
+
+    @Override
+    public void clear() {
+      try {
+        for (int i = 0; i < elemIndex; i++) {
+          kvState.remove(getComposedKey(i));
+        }
+        stateInfoKvState.remove(getComposedKey());
+        elemIndex = 0;
+      } catch (IOException e) {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+
+    private ComposedKey getComposedKey() {
+      return ComposedKey.of(key, namespace);
+    }
+
+    private ComposedKey getComposedKey(int elemIndex) {
+      return ComposedKey.of(key, namespace, elemIndex);
+    }
+
+    /**
+     * Implementation of Bag state Iterable.
+     */
+    private class BagStateIterable implements KvStoreIterable<T> {
+
+      private class BagStateIterator implements Iterator<T> {
+        private final int size;
+        private int cursor = 0;
+
+        BagStateIterator() {
+          Integer s = null;
+          try {
+            s = (Integer) stateInfoKvState.get(getComposedKey());
+          } catch (IOException e) {
+            LOG.error("Failed to get elemIndex for key={}", getComposedKey());
+          }
+          this.size = s != null ? ++s : 0;
+        }
+
+        @Override
+        public boolean hasNext() {
+          return cursor < size;
+        }
+
+        @Override
+        public T next() {
+          if (cursor >= size) {
+            throw new NoSuchElementException();
+          }
+
+          T value = null;
+          try {
+            value = kvState.get(getComposedKey(cursor));
+          } catch (IOException e) {
+            LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
+          }
+          cursor++;
+          return value;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+      }
+
+      private final int size;
+
+      BagStateIterable(int size) {
+        this.size = size;
+      }
+
+      @Override
+      public Iterator<T> iterator() {
+        return new BagStateIterator();
+      }
+
+      @Override
+      public String toString() {
+        return String.format("BagStateIterable: composedKey=%s", getComposedKey());
+      }
+    }
+  }
+
+  /**
+   * JStorm implementation of {@link CombiningState}.
+   */
+  private static class JStormCombiningState<InputT, AccumT, OutputT>
+      implements CombiningState<InputT, AccumT, OutputT> {
+
+    @Nullable
+    private final BagState<AccumT> accumBagState;
+    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+
+    JStormCombiningState(
+        BagState<AccumT> accumBagState,
+        Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+      this.accumBagState = checkNotNull(accumBagState, "accumBagState");
+      this.combineFn = checkNotNull(combineFn, "combineFn");
+    }
+
+    @Override
+    public AccumT getAccum() {
+      // TODO: replacing the accumBagState with the merged accum.
+      return combineFn.mergeAccumulators(accumBagState.read());
+    }
+
+    @Override
+    public void addAccum(AccumT accumT) {
+      accumBagState.add(accumT);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
+      return combineFn.mergeAccumulators(iterable);
+    }
+
+    @Override
+    public void add(InputT input) {
+      accumBagState.add(
+          combineFn.addInput(combineFn.createAccumulator(), input));
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return accumBagState.isEmpty();
+    }
+
+    @Override
+    public OutputT read() {
+      return combineFn.extractOutput(
+          combineFn.mergeAccumulators(accumBagState.read()));
+    }
+
+    @Override
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
+      // TODO: support prefetch.
+      return this;
+    }
+
+    @Override
+    public void clear() {
+      accumBagState.clear();
+    }
+  }
+
+  /**
+   * Implementation of {@link MapState} in JStorm runner.
+   * @param <K>
+   * @param <V>
+   */
+  private static class JStormMapState<K, V> implements MapState<K, V> {
+
+    private final K key;
+    private final StateNamespace namespace;
+    private IKvStore<K, V> kvStore;
+
+    JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
+      this.key = key;
+      this.namespace = namespace;
+      this.kvStore = kvStore;
+    }
+
+    @Override
+    public void put(K var1, V var2) {
+      try {
+        kvStore.put(var1, var2);
+      } catch (IOException e) {
+        reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
+      }
+    }
+
+    @Override
+    public ReadableState<V> putIfAbsent(K var1, V var2) {
+      ReadableState<V> ret = null;
+      try {
+        V value = kvStore.get(var1);
+        if (value == null) {
+          kvStore.put(var1, var2);
+          ret = new MapReadableState<>(null);
+        } else {
+          ret = new MapReadableState<>(value);
+        }
+      } catch (IOException e) {
+        reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
+      }
+      return ret;
+    }
+
+    @Override
+    public void remove(K var1) {
+      try {
+        kvStore.remove(var1);
+      } catch (IOException e) {
+        reportError(String.format("Failed to remove key=%s", var1), e);
+      }
+    }
+
+    @Override
+    public ReadableState<V> get(K var1) {
+      ReadableState<V> ret = new MapReadableState<>(null);
+      try {
+        ret = new MapReadableState(kvStore.get(var1));
+      } catch (IOException e) {
+        reportError(String.format("Failed to get value for key=%s", var1), e);
+      }
+      return ret;
+    }
+
+    @Override
+    public ReadableState<Iterable<K>> keys() {
+      ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
+      try {
+        ret = new MapReadableState<>(kvStore.keys());
+      } catch (IOException e) {
+        reportError(String.format("Failed to get keys"), e);
+      }
+      return ret;
+    }
+
+    @Override
+    public ReadableState<Iterable<V>> values() {
+      ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
+      try {
+        ret = new MapReadableState<>(kvStore.values());
+      } catch (IOException e) {
+        reportError(String.format("Failed to get values"), e);
+      }
+      return ret;
+    }
+
+    @Override
+    public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
+      ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
+      try {
+        ret = new MapReadableState<>(kvStore.entries());
+      } catch (IOException e) {
+        reportError(String.format("Failed to get values"), e);
+      }
+      return ret;
+    }
+
+    @Override
+    public void clear() {
+      try {
+        Iterable<K> keys = kvStore.keys();
+        kvStore.removeBatch(keys);
+      } catch (IOException e) {
+        reportError(String.format("Failed to clear map state"), e);
+      }
+    }
+
+    private void reportError(String errorInfo, IOException e) {
+      LOG.error(errorInfo, e);
+      throw new RuntimeException(errorInfo);
+    }
+
+    private class MapReadableState<T> implements ReadableState<T> {
+      private T value;
+
+      public MapReadableState(T value) {
+        this.value = value;
+      }
+
+      @Override
+      public T read() {
+        return value;
+      }
+
+      @Override
+      public ReadableState<T> readLater() {
+        return this;
+      }
+    }
+  }
+
+  /**
+   * JStorm implementation of {@link WatermarkHoldState}.
+   */
+  private static class JStormWatermarkHoldState implements WatermarkHoldState {
+
+    private final StateNamespace namespace;
+    private final GroupingState<Instant, Instant> watermarkHoldsState;
+    private final TimestampCombiner timestampCombiner;
+    private final TimerService timerService;
+
+    JStormWatermarkHoldState(
+        StateNamespace namespace,
+        GroupingState<Instant, Instant> watermarkHoldsState,
+        TimestampCombiner timestampCombiner,
+        TimerService timerService) {
+      this.namespace = checkNotNull(namespace, "namespace");
+      this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
+      this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
+      this.timerService = checkNotNull(timerService, "timerService");
+    }
+
+    @Override
+    public TimestampCombiner getTimestampCombiner() {
+      return timestampCombiner;
+    }
+
+    @Override
+    public void add(Instant instant) {
+      timerService.addWatermarkHold(namespace.stringKey(), instant);
+      watermarkHoldsState.add(instant);
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return watermarkHoldsState.isEmpty();
+    }
+
+    @Override
+    public Instant read() {
+      return watermarkHoldsState.read();
+    }
+
+    @Override
+    public WatermarkHoldState readLater() {
+      // TODO: support prefetch.
+      return this;
+    }
+
+    @Override
+    public void clear() {
+      timerService.clearWatermarkHold(namespace.stringKey());
+      watermarkHoldsState.clear();
+    }
+  }
+
   private String getStoreId(String stateId) {
     return String.format("%s-%s", stateId, executorId);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java
deleted file mode 100644
index 5d79d21..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStore;
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.ValueState;
-
-/**
- * JStorm implementation of {@link ValueState}.
- */
-class JStormValueState<K, T> implements ValueState<T> {
-
-  @Nullable
-  private final K key;
-  private final StateNamespace namespace;
-  private final IKvStore<ComposedKey, T> kvState;
-
-  JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
-    this.key = key;
-    this.namespace = namespace;
-    this.kvState = kvState;
-  }
-
-  @Override
-  public void write(T t) {
-    try {
-      kvState.put(getComposedKey(), t);
-    } catch (IOException e) {
-      throw new RuntimeException(String.format(
-          "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
-    }
-  }
-
-  @Override
-  public T read() {
-    try {
-      return kvState.get(getComposedKey());
-    } catch (IOException e) {
-      throw new RuntimeException(String.format(
-          "Failed to read key: %s, namespace: %s.", key, namespace));
-    }
-  }
-
-  @Override
-  public ValueState<T> readLater() {
-    // TODO: support prefetch.
-    return this;
-  }
-
-  @Override
-  public void clear() {
-    try {
-      kvState.remove(getComposedKey());
-    } catch (IOException e) {
-      throw new RuntimeException(String.format(
-          "Failed to clear key: %s, namespace: %s.", key, namespace));
-    }
-  }
-
-  private ComposedKey getComposedKey() {
-    return ComposedKey.of(key, namespace);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java
deleted file mode 100644
index 7e1c28f..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.GroupingState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-
-/**
- * JStorm implementation of {@link WatermarkHoldState}.
- */
-class JStormWatermarkHoldState implements WatermarkHoldState {
-
-  private final StateNamespace namespace;
-  private final GroupingState<Instant, Instant> watermarkHoldsState;
-  private final TimestampCombiner timestampCombiner;
-  private final TimerService timerService;
-
-  JStormWatermarkHoldState(
-      StateNamespace namespace,
-      GroupingState<Instant, Instant> watermarkHoldsState,
-      TimestampCombiner timestampCombiner,
-      TimerService timerService) {
-    this.namespace = checkNotNull(namespace, "namespace");
-    this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
-    this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
-    this.timerService = checkNotNull(timerService, "timerService");
-  }
-
-  @Override
-  public TimestampCombiner getTimestampCombiner() {
-    return timestampCombiner;
-  }
-
-  @Override
-  public void add(Instant instant) {
-    timerService.addWatermarkHold(namespace.stringKey(), instant);
-    watermarkHoldsState.add(instant);
-  }
-
-  @Override
-  public ReadableState<Boolean> isEmpty() {
-    return watermarkHoldsState.isEmpty();
-  }
-
-  @Override
-  public Instant read() {
-    return watermarkHoldsState.read();
-  }
-
-  @Override
-  public WatermarkHoldState readLater() {
-    // TODO: support prefetch.
-    return this;
-  }
-
-  @Override
-  public void clear() {
-    timerService.clearWatermarkHold(namespace.stringKey());
-    watermarkHoldsState.clear();
-  }
-}