You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/07 05:15:37 UTC

[1/4] storm git commit: STORM-2383 [storm-hbase] Support HBase as state backend

Repository: storm
Updated Branches:
  refs/heads/1.x-branch a647fbc62 -> caa442241


http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/storm-core/test/jvm/org/apache/storm/state/DefaultStateSerializerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/state/DefaultStateSerializerTest.java b/storm-core/test/jvm/org/apache/storm/state/DefaultStateSerializerTest.java
new file mode 100644
index 0000000..9462634
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/state/DefaultStateSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.state;
+
+import org.apache.storm.spout.CheckPointState;
+import org.apache.storm.state.DefaultStateSerializer;
+import org.apache.storm.state.Serializer;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests for {@link DefaultStateSerializer}
+ */
+public class DefaultStateSerializerTest {
+
+    @Test
+    public void testSerializeDeserialize() throws Exception {
+        Serializer<Long> s1 = new DefaultStateSerializer<Long>();
+        byte[] bytes;
+        long val = 100;
+        bytes = s1.serialize(val);
+        assertEquals(val, (long) s1.deserialize(bytes));
+
+        Serializer<CheckPointState> s2 = new DefaultStateSerializer<CheckPointState>();
+        CheckPointState cs = new CheckPointState(100, CheckPointState.State.COMMITTED);
+        bytes = s2.serialize(cs);
+        assertEquals(cs, (CheckPointState) s2.deserialize(bytes));
+
+        List<Class<?>> classesToRegister = new ArrayList<>();
+        classesToRegister.add(CheckPointState.class);
+        Serializer<CheckPointState> s3 = new DefaultStateSerializer<CheckPointState>(classesToRegister);
+        bytes = s2.serialize(cs);
+        assertEquals(cs, (CheckPointState) s2.deserialize(bytes));
+
+    }
+}
\ No newline at end of file


[4/4] storm git commit: STORM-2383: CHANGELOG

Posted by ka...@apache.org.
STORM-2383: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/caa44224
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/caa44224
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/caa44224

Branch: refs/heads/1.x-branch
Commit: caa4422411b464c3d8c92f4bfe64a3f97eaf8629
Parents: 0f9e8e9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jul 7 14:15:18 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 7 14:15:18 2017 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/caa44224/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 57b2d18..d5d63cc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.2.0
+ * STORM-2383: [storm-hbase] Support HBase as state backend
  * STORM-2506: Print mapping between Task ID and Kafka Partitions
  * STORM-2601: add the timeout parameter to the method of getting the nimbus client
  * STORM-2369: [storm-redis] Use binary type for State management


[3/4] storm git commit: Merge branch 'STORM-2383-1.x' of https://github.com/HeartSaVioR/storm into STORM-2383-1.x-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2383-1.x' of https://github.com/HeartSaVioR/storm into STORM-2383-1.x-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f9e8e9f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f9e8e9f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f9e8e9f

Branch: refs/heads/1.x-branch
Commit: 0f9e8e9fd2cdc08aa0eb96cac8f646e10c220e4b
Parents: a647fbc 947149f
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jul 7 14:14:31 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 7 14:14:31 2017 +0900

----------------------------------------------------------------------
 docs/State-checkpointing.md                     |  87 +++-
 .../src/jvm/storm/starter/StatefulTopology.java |   2 +-
 external/storm-hbase/pom.xml                    |  20 +
 .../bolt/mapper/HBaseProjectionCriteria.java    |  16 +
 .../apache/storm/hbase/common/ColumnList.java   |  37 ++
 .../apache/storm/hbase/common/HBaseClient.java  |  40 ++
 .../storm/hbase/state/HBaseKeyValueState.java   | 421 +++++++++++++++++++
 .../hbase/state/HBaseKeyValueStateIterator.java | 155 +++++++
 .../hbase/state/HBaseKeyValueStateProvider.java | 165 ++++++++
 .../storm/hbase/state/HBaseClientTestUtil.java  | 377 +++++++++++++++++
 .../state/HBaseKeyValueStateIteratorTest.java   | 212 ++++++++++
 .../state/HBaseKeyValueStateProviderTest.java   |  96 +++++
 .../hbase/state/HBaseKeyValueStateTest.java     | 117 ++++++
 .../redis/state/DefaultStateSerializerTest.java |  56 ---
 .../storm/state/DefaultStateSerializerTest.java |  56 +++
 15 files changed, 1797 insertions(+), 60 deletions(-)
----------------------------------------------------------------------



[2/4] storm git commit: STORM-2383 [storm-hbase] Support HBase as state backend

Posted by ka...@apache.org.
STORM-2383 [storm-hbase] Support HBase as state backend

* Implement HBase state backend
  * picked Redis state backend implementation
  * also implemented state iterator
* Added 'how to set up' to State-checkpointing doc
* address @arunmahadevan's review comment


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/947149f4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/947149f4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/947149f4

Branch: refs/heads/1.x-branch
Commit: 947149f45f6fc98ba96e407a8c8e3f62abea4479
Parents: fb24460
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Feb 27 21:01:13 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Jul 5 21:29:03 2017 +0900

----------------------------------------------------------------------
 docs/State-checkpointing.md                     |  87 +++-
 .../src/jvm/storm/starter/StatefulTopology.java |   2 +-
 external/storm-hbase/pom.xml                    |  20 +
 .../bolt/mapper/HBaseProjectionCriteria.java    |  16 +
 .../apache/storm/hbase/common/ColumnList.java   |  37 ++
 .../apache/storm/hbase/common/HBaseClient.java  |  40 ++
 .../storm/hbase/state/HBaseKeyValueState.java   | 421 +++++++++++++++++++
 .../hbase/state/HBaseKeyValueStateIterator.java | 155 +++++++
 .../hbase/state/HBaseKeyValueStateProvider.java | 165 ++++++++
 .../storm/hbase/state/HBaseClientTestUtil.java  | 377 +++++++++++++++++
 .../state/HBaseKeyValueStateIteratorTest.java   | 212 ++++++++++
 .../state/HBaseKeyValueStateProviderTest.java   |  96 +++++
 .../hbase/state/HBaseKeyValueStateTest.java     | 117 ++++++
 .../redis/state/DefaultStateSerializerTest.java |  56 ---
 .../storm/state/DefaultStateSerializerTest.java |  56 +++
 15 files changed, 1797 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/docs/State-checkpointing.md
----------------------------------------------------------------------
diff --git a/docs/State-checkpointing.md b/docs/State-checkpointing.md
index 7015ce0..4be6e23 100644
--- a/docs/State-checkpointing.md
+++ b/docs/State-checkpointing.md
@@ -50,8 +50,8 @@ last committed by the framework during the previous run.
 can be changed by setting the storm config `topology.state.checkpoint.interval.ms`
 5. For state persistence, use a state provider that supports persistence by setting the `topology.state.provider` in the
 storm config. E.g. for using Redis based key-value state implementation set `topology.state.provider: org.apache.storm.redis.state.RedisKeyValueStateProvider`
-in storm.yaml. The provider implementation jar should be in the class path, which in this case means putting the `storm-redis-*.jar`
-in the extlib directory.
+in storm.yaml. The provider implementation jar should be in the class path, which in this case means adding `storm-redis` 
+to dependency of your topology, or adding `--artifacts "org.apache.storm:storm-sql-redis:<storm-version>"` when submitting your topology with `storm jar`.
 6. The state provider properties can be overridden by setting `topology.state.provider.config`. For Redis state this is a
 json config with the following properties.
 
@@ -160,7 +160,10 @@ duplicate state updates during recovery.
 
 The state abstraction does not eliminate duplicate evaluations and currently provides only at-least once guarantee.
 
-In order to provide the at-least once guarantee, all bolts in a stateful topology are expected to anchor the tuples while emitting and ack the input tuples once its processed. For non-stateful bolts, the anchoring/acking can be automatically managed by extending the `BaseBasicBolt`. Stateful bolts are expected to anchor tuples while emitting and ack the tuple after processing like in the `WordCountBolt` example in the State management section above.
+In order to provide the at-least once guarantee, all bolts in a stateful topology are expected to anchor the tuples 
+while emitting and ack the input tuples once its processed. For non-stateful bolts, the anchoring/acking can be automatically 
+managed by extending the `BaseBasicBolt`. Stateful bolts are expected to anchor tuples while emitting and ack the tuple 
+after processing like in the `WordCountBolt` example in the State management section above.
 
 ### IStateful bolt hooks
 IStateful bolt interface provides hook methods where in the stateful bolts could implement some custom actions.
@@ -204,3 +207,81 @@ The framework instantiates the state via the corresponding `StateProvider` imple
 a `StateProvider` implementation which can load and return the state based on the namespace. Each state belongs to a unique namespace.
 The namespace is typically unique per task so that each task can have its own state. The StateProvider and the corresponding
 State implementation should be available in the class path of Storm (by placing them in the extlib directory).
+
+
+### Supported State Backends
+
+#### Redis
+
+* State provider class name (`topology.state.provider`)
+
+`org.apache.storm.redis.state.RedisKeyValueStateProvider`
+
+* Provider config (`topology.state.provider.config`)
+
+```
+ {
+   "keyClass": "Optional fully qualified class name of the Key type.",
+   "valueClass": "Optional fully qualified class name of the Value type.",
+   "keySerializerClass": "Optional Key serializer implementation class.",
+   "valueSerializerClass": "Optional Value Serializer implementation class.",
+   "jedisPoolConfig": {
+     "host": "localhost",
+     "port": 6379,
+     "timeout": 2000,
+     "database": 0,
+     "password": "xyz"
+   }
+ }
+ ```
+ 
+* Artifacts to add (`--artifacts`)
+
+`org.apache.storm:storm-redis:<storm-version>`
+
+#### HBase
+
+In order to make state scalable, HBaseKeyValueState stores state KV to a row. This introduces `non-atomic` commit phase and guarantee 
+eventual consistency on HBase side. It doesn't matter in point of state's view because HBaseKeyValueState can still provide not-yet-committed value.
+Even if worker crashes at commit phase, after restart it will read pending-commit states (stored atomically) from HBase and states will be stored eventually. 
+
+NOTE: HBase state provider uses pre-created table and column family, so users need to create and provide one to the provider config.
+
+You can simply create table via `create 'state', 'cf'` in `hbase shell` but in production you may want to give some more properties.
+
+* State provider class name (`topology.state.provider`)
+
+`org.apache.storm.hbase.state.HBaseKeyValueStateProvider`
+
+* Provider config (`topology.state.provider.config`)
+        
+```
+ {
+   "keyClass": "Optional fully qualified class name of the Key type.",
+   "valueClass": "Optional fully qualified class name of the Value type.",
+   "keySerializerClass": "Optional Key serializer implementation class.",
+   "valueSerializerClass": "Optional Value Serializer implementation class.",
+   "hbaseConfigKey": "config key to load hbase configuration from storm root configuration. (similar to storm-hbase)",
+   "tableName": "Pre-created table name for state.",
+   "columnFamily": "Pre-created column family for state."
+ }
+ ```
+
+If you want to initialize HBase state provider from codebase, please see below example:
+
+```
+Config conf = new Config();
+    Map<String, Object> hbConf = new HashMap<String, Object>();
+    hbConf.put("hbase.rootdir", "file:///tmp/hbase");
+    conf.put("hbase.conf", hbConf);
+    conf.put("topology.state.provider",  "org.apache.storm.hbase.state.HBaseKeyValueStateProvider");
+    conf.put("topology.state.provider.config", "{" +
+            "   \"hbaseConfigKey\": \"hbase.conf\"," +
+            "   \"tableName\": \"state\"," +
+            "   \"columnFamily\": \"cf\"" +
+            " }");
+```
+
+* Artifacts to add (`--artifacts`)
+
+`org.apache.storm:storm-hbase:<storm-version>`
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
index ba513dd..b5224b1 100644
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
@@ -142,4 +142,4 @@ public class StatefulTopology {
             cluster.shutdown();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index a6e738b..dd8cd0b 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -96,5 +96,25 @@
             <artifactId>storm-autocreds</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java
index 81e94b4..7325c62 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java
@@ -20,6 +20,7 @@ package org.apache.storm.hbase.bolt.mapper;
 import com.google.common.collect.Lists;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -41,6 +42,11 @@ public class HBaseProjectionCriteria implements Serializable {
             this.qualifier = qualifier.getBytes();
         }
 
+        public ColumnMetaData(byte[] columnFamily, byte[] qualifier) {
+            this.columnFamily = Arrays.copyOf(columnFamily, columnFamily.length);
+            this.qualifier = Arrays.copyOf(qualifier, qualifier.length);
+        }
+
         public byte[] getColumnFamily() {
             return columnFamily;
         }
@@ -66,6 +72,16 @@ public class HBaseProjectionCriteria implements Serializable {
     }
 
     /**
+     * all columns from this family will be included as result of HBase lookup.
+     * @param columnFamily
+     * @return
+     */
+    public HBaseProjectionCriteria addColumnFamily(byte[] columnFamily) {
+        this.columnFamilies.add(Arrays.copyOf(columnFamily, columnFamily.length));
+        return this;
+    }
+
+    /**
      * Only this column from the the columnFamily will be included as result of HBase lookup.
      * @param column
      * @return

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java
index 73703dc..0abe1ad 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java
@@ -87,6 +87,7 @@ public class ColumnList {
 
 
     private ArrayList<Column> columns;
+    private ArrayList<Column> columnsToDelete;
     private ArrayList<Counter> counters;
 
 
@@ -97,6 +98,13 @@ public class ColumnList {
         return this.columns;
     }
 
+    private ArrayList<Column> columnsToDelete(){
+        if(this.columnsToDelete == null){
+            this.columnsToDelete = new ArrayList<Column>();
+        }
+        return this.columnsToDelete;
+    }
+
     private ArrayList<Counter> counters(){
         if(this.counters == null){
             this.counters = new ArrayList<Counter>();
@@ -163,6 +171,17 @@ public class ColumnList {
         return this.addCounter(counter.family(), counter.qualifier(), counter.increment());
     }
 
+    /**
+     * Remove a standard HBase column
+     *
+     * @param family
+     * @param qualifier
+     * @return
+     */
+    public ColumnList deleteColumn(byte[] family, byte[] qualifier) {
+        columnsToDelete().add(new Column(family, qualifier, -1, null));
+        return this;
+    }
 
     /**
      * Query to determine if we have column definitions.
@@ -174,6 +193,15 @@ public class ColumnList {
     }
 
     /**
+     * Query to determine if we have column delete definitions.
+     *
+     * @return
+     */
+    public boolean hasColumnsToDelete(){
+        return this.columnsToDelete != null;
+    }
+
+    /**
      * Query to determine if we have counter definitions.
      *
      * @return
@@ -192,6 +220,15 @@ public class ColumnList {
     }
 
     /**
+     * Get the list of 'column to delete' definitions.
+     *
+     * @return
+     */
+    public ArrayList<Column> getColumnsToDelete() {
+        return this.columnsToDelete;
+    }
+
+    /**
      * Get the list of counter definitions.
      * @return
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
index c73bc41..f44d398 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
@@ -19,6 +19,7 @@ package org.apache.storm.hbase.common;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
@@ -83,6 +84,19 @@ public class HBaseClient implements Closeable{
             mutations.add(inc);
         }
 
+        if (cols.hasColumnsToDelete()) {
+            Delete delete = new Delete(rowKey);
+            delete.setDurability(durability);
+            for (ColumnList.Column col : cols.getColumnsToDelete()) {
+                if (col.getTs() > 0) {
+                    delete.addColumn(col.getFamily(), col.getQualifier(), col.getTs());
+                } else {
+                    delete.addColumn(col.getFamily(), col.getQualifier());
+                }
+            }
+            mutations.add(delete);
+        }
+
         if (mutations.isEmpty()) {
             mutations.add(new Put(rowKey));
         }
@@ -128,6 +142,32 @@ public class HBaseClient implements Closeable{
         }
     }
 
+    public ResultScanner scan(byte[] startRow, byte[] stopRow) throws Exception {
+        try {
+            if (startRow == null) {
+                startRow = HConstants.EMPTY_START_ROW;
+            }
+            if (stopRow == null) {
+                stopRow = HConstants.EMPTY_END_ROW;
+            }
+
+            Scan scan = new Scan(startRow, stopRow);
+            return table.getScanner(scan);
+        } catch (Exception e) {
+            LOG.warn("Could not open HBASE scanner.", e);
+            throw e;
+        }
+    }
+
+    public boolean exists(Get get) throws Exception {
+        try {
+            return table.exists(get);
+        } catch (Exception e) {
+            LOG.warn("Could not perform HBASE existence check.", e);
+            throw e;
+        }
+    }
+
     @Override
     public void close() throws IOException {
         table.close();

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java
new file mode 100644
index 0000000..54a38a2
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.state;
+
+import com.google.common.collect.Maps;
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
+import org.apache.storm.hbase.common.ColumnList;
+import org.apache.storm.hbase.common.HBaseClient;
+import org.apache.storm.state.DefaultStateEncoder;
+import org.apache.storm.state.DefaultStateSerializer;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * A Hbase based implementation that persists the state in HBase.
+ */
+public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueState.class);
+
+    public static byte[] STATE_QUALIFIER = "s".getBytes();
+    public static final int ITERATOR_CHUNK_SIZE = 1000;
+
+    public static final NavigableMap<byte[], byte[]> EMPTY_PENDING_COMMIT_MAP = Maps.unmodifiableNavigableMap(
+            new TreeMap<byte[], byte[]>(UnsignedBytes.lexicographicalComparator()));
+
+    private static byte[] COMMIT_TXID_KEY = "commit".getBytes();
+    private static byte[] PREPARE_TXID_KEY = "prepare".getBytes();
+
+    private final byte[] keyNamespace;
+    private final byte[] prepareNamespace;
+    private final byte[] txidNamespace;
+    private final String namespace;
+    private final byte[] columnFamily;
+    private final DefaultStateEncoder<K, V> encoder;
+    private final HBaseClient hBaseClient;
+
+    private ConcurrentNavigableMap<byte[], byte[]> pendingPrepare;
+    private NavigableMap<byte[], byte[]> pendingCommit;
+
+    // the key and value of txIds are guaranteed to be converted to UTF-8 encoded String
+    private NavigableMap<byte[], byte[]> txIds;
+
+    public HBaseKeyValueState(HBaseClient hbaseClient, String columnFamily, String namespace) {
+        this(hbaseClient, columnFamily, namespace, new DefaultStateSerializer<K>(),
+                new DefaultStateSerializer<V>());
+    }
+
+    public HBaseKeyValueState(HBaseClient hBaseClient, String columnFamily, String namespace,
+                              Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+
+        this.hBaseClient = hBaseClient;
+        this.columnFamily = columnFamily.getBytes();
+        this.namespace = namespace;
+        this.keyNamespace = (namespace + "$key:").getBytes();
+        this.prepareNamespace = (namespace + "$prepare").getBytes();
+        this.txidNamespace = (namespace + "$txid").getBytes();
+        this.encoder = new DefaultStateEncoder<K, V>(keySerializer, valueSerializer);
+        this.pendingPrepare = createPendingPrepareMap();
+        initTxids();
+        initPendingCommit();
+    }
+
+    private void initTxids() {
+        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
+        criteria.addColumnFamily(columnFamily);
+        Get get = hBaseClient.constructGetRequests(txidNamespace, criteria);
+        try {
+            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
+            Result result = results[0];
+            if (!result.isEmpty()) {
+                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
+                txIds = new TreeMap<>(familyMap);
+            } else {
+                txIds = new TreeMap<>(UnsignedBytes.lexicographicalComparator());
+            }
+
+            LOG.debug("initTxids, txIds {}", txIds);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void initPendingCommit() {
+        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
+        criteria.addColumnFamily(columnFamily);
+        Get get = hBaseClient.constructGetRequests(prepareNamespace, criteria);
+        try {
+            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
+            Result result = results[0];
+            if (!result.isEmpty()) {
+                LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
+                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
+                pendingCommit = Maps.unmodifiableNavigableMap(familyMap);
+            } else {
+                LOG.debug("No previously prepared commits.");
+                pendingCommit = EMPTY_PENDING_COMMIT_MAP;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void put(K key, V value) {
+        LOG.debug("put key '{}', value '{}'", key, value);
+        byte[] columnKey = encoder.encodeKey(key);
+        byte[] columnValue = encoder.encodeValue(value);
+        pendingPrepare.put(columnKey, columnValue);
+    }
+
+    @Override
+    public V get(K key) {
+        LOG.debug("get key '{}'", key);
+        byte[] columnKey = encoder.encodeKey(key);
+        byte[] columnValue = null;
+
+        if (pendingPrepare.containsKey(columnKey)) {
+            columnValue = pendingPrepare.get(columnKey);
+        } else if (pendingCommit.containsKey(columnKey)) {
+            columnValue = pendingCommit.get(columnKey);
+        } else {
+            HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
+            HBaseProjectionCriteria.ColumnMetaData column = new HBaseProjectionCriteria.ColumnMetaData(columnFamily,
+                    STATE_QUALIFIER);
+            criteria.addColumn(column);
+            Get get = hBaseClient.constructGetRequests(getRowKeyForStateKey(columnKey), criteria);
+            try {
+                Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
+                Result result = results[0];
+                columnValue = result.getValue(column.getColumnFamily(), column.getQualifier());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        V value = null;
+        if (columnValue != null) {
+            value = encoder.decodeValue(columnValue);
+        }
+        LOG.debug("Value for key '{}' is '{}'", key, value);
+        return value;
+    }
+
+    @Override
+    public V get(K key, V defaultValue) {
+        V val = get(key);
+        return val != null ? val : defaultValue;
+    }
+
+    @Override
+    public V delete(K key) {
+        LOG.debug("delete key '{}'", key);
+        byte[] columnKey = encoder.encodeKey(key);
+        V curr = get(key);
+        pendingPrepare.put(columnKey, encoder.getTombstoneValue());
+        return curr;
+    }
+
+    @Override
+    public Iterator<Map.Entry<K, V>> iterator() {
+        return new HBaseKeyValueStateIterator<>(namespace, columnFamily, hBaseClient, pendingPrepare.entrySet().iterator(),
+                pendingCommit.entrySet().iterator(), ITERATOR_CHUNK_SIZE, encoder.getKeySerializer(), encoder.getValueSerializer());
+    }
+
+    @Override
+    public void prepareCommit(long txid) {
+        LOG.debug("prepareCommit txid {}", txid);
+        validatePrepareTxid(txid);
+
+        try {
+            ConcurrentNavigableMap<byte[], byte[]> currentPending = pendingPrepare;
+            pendingPrepare = createPendingPrepareMap();
+
+            Result result = getColumnFamily(prepareNamespace, columnFamily);
+            if (!result.isEmpty()) {
+                LOG.debug("Prepared txn already exists, will merge", txid);
+                for (Map.Entry<byte[], byte[]> e : pendingCommit.entrySet()) {
+                    if (!currentPending.containsKey(e.getKey())) {
+                        currentPending.put(e.getKey(), e.getValue());
+                    }
+                }
+            } else {
+                LOG.debug("Nothing to save for prepareCommit, txid {}.", txid);
+            }
+
+            if (!currentPending.isEmpty()) {
+                mutateRow(prepareNamespace, columnFamily, currentPending);
+            } else {
+                LOG.debug("Nothing to save for prepareCommit, txid {}.", txid);
+            }
+
+            txIds.put(PREPARE_TXID_KEY, String.valueOf(txid).getBytes());
+            mutateRow(txidNamespace, columnFamily, txIds);
+            pendingCommit = Maps.unmodifiableNavigableMap(currentPending);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void commit(long txid) {
+        LOG.debug("commit txid {}", txid);
+        validateCommitTxid(txid);
+        try {
+            if (!pendingCommit.isEmpty()) {
+                applyPendingStateToHBase(pendingCommit);
+            } else {
+                LOG.debug("Nothing to save for commit, txid {}.", txid);
+            }
+            txIds.put(COMMIT_TXID_KEY, String.valueOf(txid).getBytes());
+            mutateRow(txidNamespace, columnFamily, txIds);
+            deleteRow(prepareNamespace);
+            pendingCommit = EMPTY_PENDING_COMMIT_MAP;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void commit() {
+        if (!pendingPrepare.isEmpty()) {
+            try {
+                applyPendingStateToHBase(pendingPrepare);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            LOG.debug("Nothing to save for commit");
+        }
+        pendingPrepare = createPendingPrepareMap();
+    }
+
+    @Override
+    public void rollback() {
+        LOG.debug("rollback");
+        try {
+            if (existsRow(prepareNamespace)) {
+                deleteRow(prepareNamespace);
+            } else {
+                LOG.debug("Nothing to rollback, prepared data is empty");
+            }
+            Long lastCommittedId = lastCommittedTxid();
+            if (lastCommittedId != null) {
+                txIds.put(PREPARE_TXID_KEY, String.valueOf(lastCommittedId).getBytes());
+            } else {
+                txIds.remove(PREPARE_TXID_KEY);
+            }
+            if (!txIds.isEmpty()) {
+                LOG.debug("put txidNamespace {}, txIds {}", txidNamespace, txIds);
+                mutateRow(txidNamespace, columnFamily, txIds);
+            }
+            pendingCommit = EMPTY_PENDING_COMMIT_MAP;
+            pendingPrepare = createPendingPrepareMap();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /*
+     * Same txid can be prepared again, but the next txid cannot be prepared
+     * when previous one is not committed yet.
+     */
+    private void validatePrepareTxid(long txid) {
+        Long committedTxid = lastCommittedTxid();
+        if (committedTxid != null) {
+            if (txid <= committedTxid) {
+                throw new RuntimeException("Invalid txid '" + txid + "' for prepare. Txid '" + committedTxid +
+                        "' is already committed");
+            }
+        }
+    }
+
+    /*
+     * Same txid can be committed again but the
+     * txid to be committed must be the last prepared one.
+     */
+    private void validateCommitTxid(long txid) {
+        Long committedTxid = lastCommittedTxid();
+        if (committedTxid != null) {
+            if (txid < committedTxid) {
+                throw new RuntimeException("Invalid txid '" + txid + "' txid '" + committedTxid + "' is already committed");
+            }
+        }
+        Long preparedTxid = lastPreparedTxid();
+        if (preparedTxid != null) {
+            if (txid != preparedTxid) {
+                throw new RuntimeException("Invalid txid '" + txid + "' not same as prepared txid '" + preparedTxid + "'");
+            }
+        }
+    }
+
+
+    private Long lastCommittedTxid() {
+        return lastId(COMMIT_TXID_KEY);
+    }
+
+    private Long lastPreparedTxid() {
+        return lastId(PREPARE_TXID_KEY);
+    }
+
+    private Long lastId(byte[] key) {
+        Long lastId = null;
+        byte[] txId = txIds.get(key);
+        if (txId != null) {
+            lastId = Long.valueOf(new String(txId));
+        }
+        return lastId;
+    }
+
+    private byte[] getRowKeyForStateKey(byte[] columnKey) {
+        byte[] rowKey = new byte[keyNamespace.length + columnKey.length];
+        System.arraycopy(keyNamespace, 0, rowKey, 0, keyNamespace.length);
+        System.arraycopy(columnKey, 0, rowKey, keyNamespace.length, columnKey.length);
+        return rowKey;
+    }
+
+    private void applyPendingStateToHBase(NavigableMap<byte[], byte[]> pendingMap) throws Exception {
+        List<Mutation> mutations = new ArrayList<>();
+        for (Map.Entry<byte[], byte[]> entry : pendingMap.entrySet()) {
+            byte[] rowKey = entry.getKey();
+            byte[] value = entry.getValue();
+
+            if (Arrays.equals(value, encoder.getTombstoneValue())) {
+                mutations.add(new Delete(getRowKeyForStateKey(rowKey)));
+            } else {
+                List<Mutation> mutationsForRow = prepareMutateRow(getRowKeyForStateKey(rowKey), columnFamily,
+                        Collections.singletonMap(STATE_QUALIFIER, value));
+                mutations.addAll(mutationsForRow);
+            }
+        }
+
+        hBaseClient.batchMutate(mutations);
+    }
+
+    private Result getColumnFamily(byte[] rowKey, byte[] columnFamily) throws Exception {
+        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
+        criteria.addColumnFamily(columnFamily);
+        Get get = hBaseClient.constructGetRequests(rowKey, criteria);
+        Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
+        return results[0];
+    }
+
+    private List<Mutation> prepareMutateRow(byte[] rowKey, byte[] columnFamily, Map<byte[], byte[]> map) {
+        return prepareMutateRow(rowKey, columnFamily, map, Durability.USE_DEFAULT);
+    }
+
+    private List<Mutation> prepareMutateRow(byte[] rowKey, byte[] columnFamily, Map<byte[], byte[]> map,
+                                            Durability durability) {
+        ColumnList columnList = buildColumnList(columnFamily, map);
+        return hBaseClient.constructMutationReq(rowKey, columnList, durability);
+    }
+
+    private void mutateRow(byte[] rowKey, byte[] columnFamily, Map<byte[], byte[]> map)
+            throws Exception {
+        mutateRow(rowKey, columnFamily, map, Durability.USE_DEFAULT);
+    }
+
+    private void mutateRow(byte[] rowKey, byte[] columnFamily, Map<byte[], byte[]> map,
+                           Durability durability) throws Exception {
+        hBaseClient.batchMutate(prepareMutateRow(rowKey, columnFamily, map, durability));
+    }
+
+    private boolean existsRow(byte[] rowKey) throws Exception {
+        Get get = new Get(rowKey);
+        return hBaseClient.exists(get);
+    }
+
+    private void deleteRow(byte[] rowKey) throws Exception {
+        Delete delete = new Delete(rowKey);
+        hBaseClient.batchMutate(Collections.<Mutation>singletonList(delete));
+    }
+
+    private ColumnList buildColumnList(byte[] columnFamily, Map<byte[], byte[]> map) {
+        ColumnList columnList = new ColumnList();
+        for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
+            columnList.addColumn(columnFamily, entry.getKey(), entry.getValue());
+        }
+        return columnList;
+    }
+
+    /**
+     * Intended to extract this to separate method since only pendingPrepare uses ConcurrentNavigableMap.
+     */
+    private ConcurrentNavigableMap<byte[], byte[]> createPendingPrepareMap() {
+        return new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java
new file mode 100644
index 0000000..dd9c20e
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.state;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.storm.hbase.common.HBaseClient;
+import org.apache.storm.state.BaseBinaryStateIterator;
+import org.apache.storm.state.DefaultStateEncoder;
+import org.apache.storm.state.Serializer;
+
+import org.apache.storm.state.StateEncoder;
+
+import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER;
+
+/**
+ * An iterator over {@link HBaseKeyValueState}.
+ */
+public class HBaseKeyValueStateIterator<K, V> extends BaseBinaryStateIterator<K, V> {
+
+    private final byte[] keyNamespace;
+    private byte[] cursorKey;
+    private final byte[] endScanKey;
+    private final byte[] columnFamily;
+    private final HBaseClient hBaseClient;
+    private final int chunkSize;
+    private final StateEncoder<K, V, byte[], byte[]> encoder;
+
+    private Iterator<Map.Entry<byte[], byte[]>> cachedResultIterator;
+
+    /**
+     * Constructor.
+     *
+     * @param namespace The namespace of State
+     * @param columnFamily The column family of state
+     * @param hBaseClient The instance of HBaseClient
+     * @param pendingPrepareIterator The iterator of pendingPrepare
+     * @param pendingCommitIterator The iterator of pendingCommit
+     * @param chunkSize The size of chunk to get entries from HBase
+     * @param keySerializer The serializer of key
+     * @param valueSerializer The serializer of value
+     */
+    public HBaseKeyValueStateIterator(String namespace, byte[] columnFamily, HBaseClient hBaseClient,
+                                      Iterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator,
+                                      Iterator<Map.Entry<byte[], byte[]>> pendingCommitIterator,
+                                      int chunkSize, Serializer<K> keySerializer,
+                                      Serializer<V> valueSerializer) {
+        super(pendingPrepareIterator, pendingCommitIterator);
+        this.columnFamily = columnFamily;
+        this.keyNamespace = (namespace + "$key:").getBytes();
+        this.cursorKey = (namespace + "$key:").getBytes();
+
+        // this is the end key for whole scan
+        this.endScanKey = advanceRow(this.cursorKey);
+        this.hBaseClient = hBaseClient;
+        this.chunkSize = chunkSize;
+        this.encoder = new DefaultStateEncoder<K, V>(keySerializer, valueSerializer);
+    }
+
+    @Override
+    protected Iterator<Map.Entry<byte[], byte[]>> loadChunkFromStateStorage() {
+        loadChunkFromHBase();
+        return cachedResultIterator;
+    }
+
+    @Override
+    protected boolean isEndOfDataFromStorage() {
+        if (cachedResultIterator != null && cachedResultIterator.hasNext()) {
+            return false;
+        }
+
+        try {
+            ResultScanner resultScanner = hBaseClient.scan(cursorKey, endScanKey);
+            return !(resultScanner.iterator().hasNext());
+        } catch (Exception e) {
+            throw new RuntimeException("Fail to scan from HBase state storage.");
+        }
+    }
+
+    @Override
+    protected K decodeKey(byte[] key) {
+        return encoder.decodeKey(key);
+    }
+
+    @Override
+    protected V decodeValue(byte[] value) {
+        return encoder.decodeValue(value);
+    }
+
+    @Override
+    protected boolean isTombstoneValue(byte[] value) {
+        return Arrays.equals(value, encoder.getTombstoneValue());
+    }
+
+    private void loadChunkFromHBase() {
+        Map<byte[], byte[]> chunk = new TreeMap<>(UnsignedBytes.lexicographicalComparator());
+        try {
+            ResultScanner resultScanner = hBaseClient.scan(cursorKey, endScanKey);
+
+            Result[] results = resultScanner.next(chunkSize);
+
+            for (Result result : results) {
+                byte[] columnKey = extractStateKeyFromRowKey(result.getRow());
+                byte[] columnValue = result.getValue(columnFamily, STATE_QUALIFIER);
+
+                chunk.put(columnKey, columnValue);
+            }
+
+            if (results.length > 0) {
+                byte[] lastRow = results[results.length - 1].getRow();
+                cursorKey = advanceRow(lastRow);
+            }
+
+            cachedResultIterator = chunk.entrySet().iterator();
+        } catch (Exception e) {
+            throw new RuntimeException("Fail to scan from HBase state storage.", e);
+        }
+    }
+
+    private byte[] advanceRow(byte[] row) {
+        byte[] advancedRow = new byte[row.length];
+        System.arraycopy(row, 0, advancedRow, 0, row.length);
+        advancedRow[row.length - 1]++;
+        return advancedRow;
+    }
+
+    private byte[] extractStateKeyFromRowKey(byte[] row) {
+        byte[] stateKey = new byte[row.length - keyNamespace.length];
+        System.arraycopy(row, keyNamespace.length, stateKey, 0, row.length - keyNamespace.length);
+        return stateKey;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java
new file mode 100644
index 0000000..001400a
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.state;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.hbase.common.HBaseClient;
+import org.apache.storm.state.DefaultStateSerializer;
+import org.apache.storm.state.Serializer;
+import org.apache.storm.state.State;
+import org.apache.storm.state.StateProvider;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Provides {@link HBaseKeyValueState}.
+ */
+public class HBaseKeyValueStateProvider implements StateProvider {
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueStateProvider.class);
+
+    @Override
+    public State newState(String namespace, Map stormConf, TopologyContext context) {
+        try {
+            return getHBaseKeyValueState(namespace, stormConf, getStateConfig(stormConf));
+        } catch (Exception ex) {
+            LOG.error("Error loading config from storm conf {}", stormConf);
+            throw new RuntimeException(ex);
+        }
+    }
+
+    StateConfig getStateConfig(Map stormConf) throws Exception {
+        StateConfig stateConfig = null;
+        String providerConfig = null;
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+        if (stormConf.containsKey(org.apache.storm.Config.TOPOLOGY_STATE_PROVIDER_CONFIG)) {
+            providerConfig = (String) stormConf.get(org.apache.storm.Config.TOPOLOGY_STATE_PROVIDER_CONFIG);
+            stateConfig = mapper.readValue(providerConfig, StateConfig.class);
+        } else {
+            stateConfig = new StateConfig();
+        }
+
+        // assertion
+        assertMandatoryParameterNotEmpty(stateConfig.hbaseConfigKey, "hbaseConfigKey");
+        assertMandatoryParameterNotEmpty(stateConfig.tableName, "tableName");
+        assertMandatoryParameterNotEmpty(stateConfig.columnFamily, "columnFamily");
+
+        return stateConfig;
+    }
+
+    private HBaseKeyValueState getHBaseKeyValueState(String namespace, Map stormConf, StateConfig config) throws Exception {
+        Map<String, Object> conf = getHBaseConfigMap(stormConf, config.hbaseConfigKey);
+        final Configuration hbConfig = getHBaseConfigurationInstance(conf);
+
+        //heck for backward compatibility, we need to pass TOPOLOGY_AUTO_CREDENTIALS to hbase conf
+        //the conf instance is instance of persistentMap so making a copy.
+        Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf);
+        hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, stormConf.get(Config.TOPOLOGY_AUTO_CREDENTIALS));
+        HBaseClient hbaseClient = new HBaseClient(hbaseConfMap, hbConfig, config.tableName);
+
+        return new HBaseKeyValueState(hbaseClient, config.columnFamily, namespace,
+                getKeySerializer(config), getValueSerializer(config));
+    }
+
+    private Configuration getHBaseConfigurationInstance(Map<String, Object> conf) {
+        final Configuration hbConfig = HBaseConfiguration.create();
+        for(String key : conf.keySet()) {
+            hbConfig.set(key, String.valueOf(conf.get(key)));
+        }
+        return hbConfig;
+    }
+
+    private Map<String, Object> getHBaseConfigMap(Map<String, Object> stormConfMap, String hbaseConfigKey) {
+        Map<String, Object> conf = (Map<String, Object>) stormConfMap.get(hbaseConfigKey);
+        if(conf == null) {
+            throw new IllegalArgumentException("HBase configuration not found using key '" + hbaseConfigKey + "'");
+        }
+
+        if(conf.get("hbase.rootdir") == null) {
+            LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults.");
+        }
+        return conf;
+    }
+
+    private void assertMandatoryParameterNotEmpty(String paramValue, String fieldName) {
+        if (StringUtils.isEmpty(paramValue)) {
+            throw new IllegalArgumentException(fieldName + " should be provided.");
+        }
+    }
+
+    private Serializer getKeySerializer(StateConfig config) throws Exception {
+        Serializer serializer = null;
+        if (config.keySerializerClass != null) {
+            Class<?> klass = (Class<?>) Class.forName(config.keySerializerClass);
+            serializer = (Serializer) klass.newInstance();
+        } else if (config.keyClass != null) {
+            serializer = new DefaultStateSerializer(Collections.singletonList(Class.forName(config.keyClass)));
+        } else {
+            serializer = new DefaultStateSerializer();
+        }
+        return serializer;
+    }
+
+    private Serializer getValueSerializer(StateConfig config) throws Exception {
+        Serializer serializer = null;
+        if (config.valueSerializerClass != null) {
+            Class<?> klass = (Class<?>) Class.forName(config.valueSerializerClass);
+            serializer = (Serializer) klass.newInstance();
+        } else if (config.valueClass != null) {
+            serializer = new DefaultStateSerializer(Collections.singletonList(Class.forName(config.valueClass)));
+        } else {
+            serializer = new DefaultStateSerializer();
+        }
+        return serializer;
+    }
+
+    static class StateConfig {
+        String keyClass;
+        String valueClass;
+        String keySerializerClass;
+        String valueSerializerClass;
+        String hbaseConfigKey;
+        String tableName;
+        String columnFamily;
+
+        @Override
+        public String toString() {
+            return "StateConfig{" +
+                    "keyClass='" + keyClass + '\'' +
+                    ", valueClass='" + valueClass + '\'' +
+                    ", keySerializerClass='" + keySerializerClass + '\'' +
+                    ", valueSerializerClass='" + valueSerializerClass + '\'' +
+                    ", hbaseConfigKey='" + hbaseConfigKey + '\'' +
+                    ", tableName='" + tableName + '\'' +
+                    ", columnFamily='" + columnFamily + '\'' +
+                    '}';
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java
new file mode 100644
index 0000000..634620e
--- /dev/null
+++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.state;
+
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
+import org.apache.storm.hbase.common.ColumnList;
+import org.apache.storm.hbase.common.HBaseClient;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+
+public class HBaseClientTestUtil {
+    private HBaseClientTestUtil() {
+    }
+
+    public static HBaseClient mockedHBaseClient() throws Exception {
+        return mockedHBaseClient(new ConcurrentSkipListMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>>(
+                UnsignedBytes.lexicographicalComparator()));
+    }
+
+    public static HBaseClient mockedHBaseClient(
+            ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> internalMap)
+            throws Exception {
+        HBaseClient mockClient = mock(HBaseClient.class);
+
+        Mockito.doNothing().when(mockClient).close();
+
+        Mockito.when(mockClient.constructGetRequests(any(byte[].class), any(HBaseProjectionCriteria.class)))
+                .thenCallRealMethod();
+
+        Mockito.when(mockClient.constructMutationReq(any(byte[].class), any(ColumnList.class), any(Durability.class)))
+                .thenCallRealMethod();
+
+        Mockito.when(mockClient.exists(any(Get.class))).thenAnswer(new ExistsAnswer(internalMap));
+        Mockito.when(mockClient.batchGet(any(List.class))).thenAnswer(new BatchGetAnswer(internalMap));
+        Mockito.doAnswer(new BatchMutateAnswer(internalMap)).when(mockClient).batchMutate(any(List.class));
+        Mockito.when(mockClient.scan(any(byte[].class), any(byte[].class))).thenAnswer(new ScanAnswer(internalMap));
+
+        return mockClient;
+    }
+
+    static class BuildCellsHelper {
+        public static void addMatchingColumnFamilies(byte[] rowKey, Map<byte[], NavigableSet<byte[]>> familyMap,
+                                               NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfToQualifierToValueMap,
+                                               List<Cell> cells) {
+            for (Map.Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+                byte[] columnFamily = entry.getKey();
+
+                NavigableMap<byte[], byte[]> qualifierToValueMap = cfToQualifierToValueMap.get(columnFamily);
+                if (qualifierToValueMap != null) {
+                    if (entry.getValue() == null || entry.getValue().size() == 0) {
+                        addAllQualifiers(rowKey, columnFamily, qualifierToValueMap, cells);
+                    } else {
+                        addMatchingQualifiers(rowKey, columnFamily, entry, qualifierToValueMap, cells);
+                    }
+                }
+            }
+        }
+
+        public static void addMatchingQualifiers(byte[] rowKey, byte[] columnFamily,
+                                           Map.Entry<byte[], NavigableSet<byte[]>> qualifierSet,
+                                           NavigableMap<byte[], byte[]> qualifierToValueMap,
+                                           List<Cell> cells) {
+            for (byte[] qualifier : qualifierSet.getValue()) {
+                byte[] value = qualifierToValueMap.get(qualifier);
+                if (value != null) {
+                    cells.add(new KeyValue(rowKey, columnFamily, qualifier, value));
+                }
+            }
+        }
+
+        public static void addAllColumnFamilies(byte[] rowKey, NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfToQualifierToValueMap,
+                                          List<Cell> cells) {
+            for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> entry : cfToQualifierToValueMap.entrySet()) {
+                byte[] columnFamily = entry.getKey();
+                addAllQualifiers(rowKey, columnFamily, entry.getValue(), cells);
+            }
+        }
+
+        public static void addAllQualifiers(byte[] rowKey, byte[] columnFamily,
+                                      NavigableMap<byte[], byte[]> qualifierToValueMap, List<Cell> cells) {
+            for (Map.Entry<byte[], byte[]> entry2 : qualifierToValueMap.entrySet()) {
+                byte[] qualifier = entry2.getKey();
+                byte[] value = entry2.getValue();
+                cells.add(new KeyValue(rowKey, columnFamily, qualifier, value));
+            }
+        }
+
+    }
+
+    static class BatchGetAnswer implements Answer<Result[]> {
+        private final ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> mockMap;
+
+        public BatchGetAnswer(ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> mockMap) {
+            this.mockMap = mockMap;
+        }
+
+        @Override
+        public Result[] answer(InvocationOnMock invocationOnMock) throws Throwable {
+            Object[] args = invocationOnMock.getArguments();
+            List<Get> param = (List<Get>) args[0];
+
+            List<Result> results = new ArrayList<>(param.size());
+
+            for (Get get : param) {
+                byte[] rowKey = get.getRow();
+
+                NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfToQualifierToValueMap =
+                        mockMap.get(rowKey);
+
+                if (cfToQualifierToValueMap != null) {
+                    Map<byte[], NavigableSet<byte[]>> familyMap = get.getFamilyMap();
+
+                    List<Cell> cells = new ArrayList<>();
+                    if (familyMap == null || familyMap.size() == 0) {
+                        // all column families
+                        BuildCellsHelper.addAllColumnFamilies(rowKey, cfToQualifierToValueMap, cells);
+                    } else {
+                        // one or more column families
+                        BuildCellsHelper.addMatchingColumnFamilies(rowKey, familyMap, cfToQualifierToValueMap, cells);
+                    }
+
+                    // Result.create() states that "You must ensure that the keyvalues are already sorted."
+                    Collections.sort(cells, new KeyValue.KVComparator());
+                    results.add(Result.create(cells));
+                } else {
+                    results.add(Result.EMPTY_RESULT);
+                }
+            }
+
+            return results.toArray(new Result[0]);
+        }
+    }
+
+    static class BatchMutateAnswer implements Answer {
+        private final ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> mockMap;
+
+        public BatchMutateAnswer(ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> mockMap) {
+            this.mockMap = mockMap;
+        }
+
+        @Override
+        public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+            Object[] args = invocationOnMock.getArguments();
+            List<Mutation> param = (List<Mutation>) args[0];
+
+            // assumption: there're no put and delete for same target in parameter list
+            for (Mutation mutation : param) {
+                byte[] rowKey = mutation.getRow();
+
+                NavigableMap<byte[], List<Cell>> familyCellMap = mutation.getFamilyCellMap();
+                if (familyCellMap == null || familyCellMap.size() == 0) {
+                    if (mutation instanceof Delete) {
+                        deleteRow(mockMap, rowKey);
+                    } else {
+                        throw new IllegalStateException("Not supported in mocked mutate.");
+                    }
+                }
+
+                for (Map.Entry<byte[], List<Cell>> entry : familyCellMap.entrySet()) {
+                    byte[] columnFamily = entry.getKey();
+                    List<Cell> cells = entry.getValue();
+
+                    if (cells == null || cells.size() == 0) {
+                        if (mutation instanceof Delete) {
+                            deleteColumnFamily(mockMap, rowKey, columnFamily);
+                        } else {
+                            throw new IllegalStateException("Not supported in mocked mutate.");
+                        }
+                    } else {
+                        for (Cell cell : cells) {
+                            byte[] qualifier = CellUtil.cloneQualifier(cell);
+
+                            if (mutation instanceof Put) {
+                                byte[] value = CellUtil.cloneValue(cell);
+
+                                putCell(mockMap, rowKey, columnFamily, qualifier, value);
+                            } else if (mutation instanceof Delete) {
+                                deleteCell(mockMap, rowKey, columnFamily, qualifier);
+                            } else {
+                                throw new IllegalStateException("Not supported in mocked mutate.");
+                            }
+                        }
+                    }
+                }
+            }
+
+            return null;
+        }
+
+        private void putCell(ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> mockMap,
+                             byte[] rowKey, byte[] columnFamily, byte[] qualifier, byte[] value) {
+            NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfToQualifierToValue = mockMap.get(rowKey);
+            if (cfToQualifierToValue == null) {
+                cfToQualifierToValue = new TreeMap<>(UnsignedBytes.lexicographicalComparator());
+                mockMap.put(rowKey, cfToQualifierToValue);
+            }
+
+            NavigableMap<byte[], byte[]> qualifierToValue = cfToQualifierToValue.get(columnFamily);
+            if (qualifierToValue == null) {
+                qualifierToValue = new TreeMap<>(UnsignedBytes.lexicographicalComparator());
+                cfToQualifierToValue.put(columnFamily, qualifierToValue);
+            }
+
+            qualifierToValue.put(qualifier, value);
+        }
+
+        private void deleteRow(ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> mockMap,
+                               byte[] rowKey) {
+            mockMap.remove(rowKey);
+        }
+
+        private void deleteColumnFamily(ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> mockMap,
+                                        byte[] rowKey, byte[] columnFamily) {
+            NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfToQualifierToValue = mockMap.get(rowKey);
+            if (cfToQualifierToValue != null) {
+                cfToQualifierToValue.remove(columnFamily);
+            }
+        }
+
+        private void deleteCell(ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> mockMap,
+                                byte[] rowKey, byte[] columnFamily, byte[] qualifier) {
+            NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfToQualifierToValue = mockMap.get(rowKey);
+            if (cfToQualifierToValue != null) {
+                NavigableMap<byte[], byte[]> qualifierToValue = cfToQualifierToValue.get(columnFamily);
+                if (qualifierToValue != null) {
+                    qualifierToValue.remove(qualifier);
+                }
+            }
+        }
+    }
+
+    static class ExistsAnswer implements Answer<Boolean> {
+        private final ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> mockMap;
+
+        public ExistsAnswer(ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> mockMap) {
+            this.mockMap = mockMap;
+        }
+
+        @Override
+        public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
+            Object[] args = invocationOnMock.getArguments();
+            Get param = (Get) args[0];
+
+            // assume that Get doesn't have any families defined. this is for not digging deeply...
+            byte[] rowKey = param.getRow();
+            Map<byte[], NavigableSet<byte[]>> familyMap = param.getFamilyMap();
+            if (familyMap.size() > 0) {
+                throw new IllegalStateException("Not supported in mocked exists.");
+            }
+
+            return mockMap.containsKey(rowKey);
+        }
+    }
+
+    static class ScanAnswer implements Answer<ResultScanner> {
+        private final ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> mockMap;
+
+        public ScanAnswer(ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> internalMap) {
+            this.mockMap = internalMap;
+        }
+
+        @Override
+        public ResultScanner answer(InvocationOnMock invocationOnMock) throws Throwable {
+            Object[] args = invocationOnMock.getArguments();
+            byte[] startKey = (byte[]) args[0];
+            byte[] endKey = (byte[]) args[1];
+
+            final ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> subMap =
+                    mockMap.subMap(startKey, true, endKey, false);
+
+            final List<Result> results = buildResults(subMap);
+
+            return new MockedResultScanner(results);
+        }
+
+        private List<Result> buildResults(ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> subMap) {
+            final List<Result> results = new ArrayList<>();
+            for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> entry : subMap.entrySet()) {
+                byte[] rowKey = entry.getKey();
+                NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfToQualifierToValueMap = entry.getValue();
+                List<Cell> cells = new ArrayList<>();
+                // all column families
+                BuildCellsHelper.addAllColumnFamilies(rowKey, cfToQualifierToValueMap, cells);
+
+                // Result.create() states that "You must ensure that the keyvalues are already sorted."
+                Collections.sort(cells, new KeyValue.KVComparator());
+                results.add(Result.create(cells));
+            }
+            return results;
+        }
+
+        static class MockedResultScanner implements ResultScanner {
+
+            private final List<Result> results;
+            private int position = 0;
+
+            MockedResultScanner(List<Result> results) {
+                this.results = results;
+            }
+
+            @Override
+            public Result next() throws IOException {
+                if (results.size() <= position) {
+                    return null;
+                }
+                return results.get(position++);
+            }
+
+            @Override
+            public Result[] next(int nbRows) throws IOException {
+                List<Result> bulkResult = new ArrayList<>();
+                for (int i = 0 ; i < nbRows ; i++) {
+                    Result result = next();
+                    if (result == null) {
+                        break;
+                    }
+
+                    bulkResult.add(result);
+                }
+                return bulkResult.toArray(new Result[0]);
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Iterator<Result> iterator() {
+                return results.iterator();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java
new file mode 100644
index 0000000..d9a2ed8
--- /dev/null
+++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.state;
+
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.storm.hbase.common.ColumnList;
+import org.apache.storm.hbase.common.HBaseClient;
+import org.apache.storm.state.DefaultStateEncoder;
+import org.apache.storm.state.DefaultStateSerializer;
+import org.apache.storm.state.Serializer;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER;
+import static org.junit.Assert.*;
+
+/**
+ * Test for HBaseKeyValueStateIterator.
+ */
+public class HBaseKeyValueStateIteratorTest {
+
+    private String namespace;
+    private byte[] keyNamespace;
+    private byte[] columnFamily;
+    private HBaseClient mockHBaseClient;
+    private int chunkSize = 1000;
+    private Serializer<byte[]> keySerializer = new DefaultStateSerializer<>();
+    private Serializer<byte[]> valueSerializer = new DefaultStateSerializer<>();
+    private DefaultStateEncoder<byte[], byte[]> encoder;
+
+    @Before
+    public void setUp() throws Exception {
+        namespace = "namespace";
+        keyNamespace = (namespace + "$key:").getBytes();
+        columnFamily = "cf".getBytes();
+        mockHBaseClient = HBaseClientTestUtil.mockedHBaseClient();
+        encoder = new DefaultStateEncoder<>(keySerializer, valueSerializer);
+    }
+
+    @Test
+    public void testGetEntriesInHBase() throws Exception {
+        // pendingPrepare has no entries
+        NavigableMap<byte[], byte[]> pendingPrepare = getBinaryTreeMap();
+
+        // pendingCommit has no entries
+        NavigableMap<byte[], byte[]> pendingCommit = getBinaryTreeMap();
+
+        // HBase has some entries
+        NavigableMap<byte[], byte[]> chunkMap = getBinaryTreeMap();
+        putEncodedKeyValueToMap(chunkMap, "key0".getBytes(), "value0".getBytes());
+        putEncodedKeyValueToMap(chunkMap, "key2".getBytes(), "value2".getBytes());
+
+        applyPendingStateToHBase(chunkMap);
+
+        HBaseKeyValueStateIterator<byte[], byte[]> kvIterator =
+                new HBaseKeyValueStateIterator<>(namespace, columnFamily, mockHBaseClient, pendingPrepare.entrySet().iterator(),
+                        pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
+
+        assertNextEntry(kvIterator, "key0".getBytes(), "value0".getBytes());
+
+        // key1 shouldn't in iterator
+
+        assertNextEntry(kvIterator, "key2".getBytes(), "value2".getBytes());
+
+        assertFalse(kvIterator.hasNext());
+    }
+
+    @Test
+    public void testGetEntriesRemovingDuplicationKeys() throws Exception {
+        NavigableMap<byte[], byte[]> pendingPrepare = getBinaryTreeMap();
+        putEncodedKeyValueToMap(pendingPrepare, "key0".getBytes(), "value0".getBytes());
+        putTombstoneToMap(pendingPrepare, "key1".getBytes());
+
+        NavigableMap<byte[], byte[]> pendingCommit = getBinaryTreeMap();
+        putEncodedKeyValueToMap(pendingCommit, "key1".getBytes(), "value1".getBytes());
+        putEncodedKeyValueToMap(pendingCommit, "key2".getBytes(), "value2".getBytes());
+
+        NavigableMap<byte[], byte[]> chunkMap = getBinaryTreeMap();
+        putEncodedKeyValueToMap(chunkMap, "key2".getBytes(), "value2".getBytes());
+        putEncodedKeyValueToMap(chunkMap, "key3".getBytes(), "value3".getBytes());
+        putEncodedKeyValueToMap(chunkMap, "key4".getBytes(), "value4".getBytes());
+
+        applyPendingStateToHBase(chunkMap);
+
+        HBaseKeyValueStateIterator<byte[], byte[]> kvIterator =
+                new HBaseKeyValueStateIterator<>(namespace, columnFamily, mockHBaseClient, pendingPrepare.entrySet().iterator(),
+                        pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
+
+        // keys shouldn't appear twice
+
+        assertNextEntry(kvIterator, "key0".getBytes(), "value0".getBytes());
+
+        // key1 shouldn't be in iterator since it's marked as deleted
+
+        assertNextEntry(kvIterator, "key2".getBytes(), "value2".getBytes());
+        assertNextEntry(kvIterator, "key3".getBytes(), "value3".getBytes());
+        assertNextEntry(kvIterator, "key4".getBytes(), "value4".getBytes());
+
+        assertFalse(kvIterator.hasNext());
+    }
+
+    @Test
+    public void testGetEntryNotAvailable() {
+        NavigableMap<byte[], byte[]> pendingPrepare = getBinaryTreeMap();
+
+        NavigableMap<byte[], byte[]> pendingCommit = getBinaryTreeMap();
+
+        HBaseKeyValueStateIterator<byte[], byte[]> kvIterator =
+                new HBaseKeyValueStateIterator<>(namespace, columnFamily, mockHBaseClient, pendingPrepare.entrySet().iterator(),
+                        pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
+
+        assertFalse(kvIterator.hasNext());
+    }
+
+    private void assertNextEntry(HBaseKeyValueStateIterator<byte[], byte[]> kvIterator, byte[] expectedKey,
+                                 byte[] expectedValue) {
+        assertTrue(kvIterator.hasNext());
+        Map.Entry<byte[], byte[]> entry = kvIterator.next();
+        assertArrayEquals(expectedKey, entry.getKey());
+        assertArrayEquals(expectedValue, entry.getValue());
+    }
+
+    private void putEncodedKeyValueToMap(NavigableMap<byte[], byte[]> map, byte[] key, byte[] value) {
+        map.put(encoder.encodeKey(key), encoder.encodeValue(value));
+    }
+
+    private void putTombstoneToMap(NavigableMap<byte[], byte[]> map, byte[] key) {
+        map.put(encoder.encodeKey(key), encoder.getTombstoneValue());
+    }
+
+    private TreeMap<byte[], byte[]> getBinaryTreeMap() {
+        return new TreeMap<>(UnsignedBytes.lexicographicalComparator());
+    }
+
+    private void applyPendingStateToHBase(NavigableMap<byte[], byte[]> pendingMap) throws Exception {
+        List<Mutation> mutations = new ArrayList<>();
+        for (Map.Entry<byte[], byte[]> entry : pendingMap.entrySet()) {
+            byte[] rowKey = entry.getKey();
+            byte[] value = entry.getValue();
+
+            if (Arrays.equals(value, encoder.getTombstoneValue())) {
+                mutations.add(new Delete(getRowKeyForStateKey(rowKey)));
+            } else {
+                List<Mutation> mutationsForRow = prepareMutateRow(getRowKeyForStateKey(rowKey), columnFamily,
+                        Collections.singletonMap(STATE_QUALIFIER, value));
+                mutations.addAll(mutationsForRow);
+            }
+        }
+
+        mockHBaseClient.batchMutate(mutations);
+    }
+
+    private byte[] getRowKeyForStateKey(byte[] columnKey) {
+        byte[] rowKey = new byte[keyNamespace.length + columnKey.length];
+        System.arraycopy(keyNamespace, 0, rowKey, 0, keyNamespace.length);
+        System.arraycopy(columnKey, 0, rowKey, keyNamespace.length, columnKey.length);
+        return rowKey;
+    }
+
+    private ColumnList buildColumnList(byte[] columnFamily, Map<byte[], byte[]> map) {
+        ColumnList columnList = new ColumnList();
+        for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
+            columnList.addColumn(columnFamily, entry.getKey(), entry.getValue());
+        }
+        return columnList;
+    }
+
+    private List<Mutation> prepareMutateRow(byte[] rowKey, byte[] columnFamily, Map<byte[], byte[]> map) {
+        return prepareMutateRow(rowKey, columnFamily, map, Durability.USE_DEFAULT);
+    }
+
+    private List<Mutation> prepareMutateRow(byte[] rowKey, byte[] columnFamily, Map<byte[], byte[]> map,
+                                            Durability durability) {
+        ColumnList columnList = buildColumnList(columnFamily, map);
+        return mockHBaseClient.constructMutationReq(rowKey, columnList, durability);
+    }
+
+    private void mutateRow(byte[] rowKey, byte[] columnFamily, Map<byte[], byte[]> map)
+            throws Exception {
+        mutateRow(rowKey, columnFamily, map, Durability.USE_DEFAULT);
+    }
+
+    private void mutateRow(byte[] rowKey, byte[] columnFamily, Map<byte[], byte[]> map,
+                           Durability durability) throws Exception {
+        mockHBaseClient.batchMutate(prepareMutateRow(rowKey, columnFamily, map, durability));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java
new file mode 100644
index 0000000..bd38e57
--- /dev/null
+++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.state;
+
+import org.apache.storm.Config;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for {@link HBaseKeyValueStateProvider}
+ */
+public class HBaseKeyValueStateProviderTest {
+
+    @Test
+    public void testConfigHBaseConfigKeyIsEmpty() throws Exception {
+        HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider();
+        Map<String, String> stormConf = new HashMap<>();
+        stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," +
+                " \"tableName\": \"table\", \"columnFamily\": \"cf\"}");
+
+        try {
+            HBaseKeyValueStateProvider.StateConfig config = provider.getStateConfig(stormConf);
+            fail("IllegalArgumentException is expected here.");
+        } catch (IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains("hbaseConfigKey"));
+        }
+    }
+
+    @Test
+    public void testConfigTableNameIsEmpty() throws Exception {
+        HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider();
+        Map<String, String> stormConf = new HashMap<>();
+        stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," +
+                " \"hbaseConfigKey\": \"hbaseConfKey\", \"columnFamily\": \"cf\"}");
+
+        try {
+            HBaseKeyValueStateProvider.StateConfig config = provider.getStateConfig(stormConf);
+            fail("IllegalArgumentException is expected here.");
+        } catch (IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains("tableName"));
+        }
+    }
+
+    @Test
+    public void testConfigColumnFamilyIsEmpty() throws Exception {
+        HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider();
+        Map<String, String> stormConf = new HashMap<>();
+        stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," +
+                " \"hbaseConfigKey\": \"hbaseConfKey\", \"tableName\": \"table\"}");
+
+        try {
+            HBaseKeyValueStateProvider.StateConfig config = provider.getStateConfig(stormConf);
+            fail("IllegalArgumentException is expected here.");
+        } catch (IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains("columnFamily"));
+        }
+    }
+
+    @Test
+    public void testValidProviderConfig() throws Exception {
+        HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider();
+        Map<String, String> stormConf = new HashMap<>();
+        stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," +
+                " \"hbaseConfigKey\": \"hbaseConfKey\", \"tableName\": \"table\"," +
+                " \"columnFamily\": \"columnFamily\"}");
+
+        HBaseKeyValueStateProvider.StateConfig config = provider.getStateConfig(stormConf);
+        assertEquals("String", config.keyClass);
+        assertEquals("String", config.valueClass);
+        assertEquals("hbaseConfKey", config.hbaseConfigKey);
+        assertEquals("table", config.tableName);
+        assertEquals("columnFamily", config.columnFamily);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java
new file mode 100644
index 0000000..7827283
--- /dev/null
+++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.state;
+
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.storm.hbase.common.HBaseClient;
+import org.apache.storm.state.DefaultStateSerializer;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for {@link HBaseKeyValueState}
+ */
+public class HBaseKeyValueStateTest {
+    private static final String COLUMN_FAMILY = "cf";
+    private static final String NAMESPACE = "namespace";
+
+    HBaseClient mockClient;
+    HBaseKeyValueState<String, String> keyValueState;
+    ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> mockMap;
+
+    @Before
+    public void setUp() throws Exception {
+        mockMap = new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
+        mockClient = HBaseClientTestUtil.mockedHBaseClient(mockMap);
+        keyValueState = new HBaseKeyValueState<>(mockClient, COLUMN_FAMILY, NAMESPACE,
+                new DefaultStateSerializer<String>(), new DefaultStateSerializer<String>());
+    }
+
+    @Test
+    public void testPutAndGet() throws Exception {
+        keyValueState.put("a", "1");
+        keyValueState.put("b", "2");
+        assertEquals("1", keyValueState.get("a"));
+        assertEquals("2", keyValueState.get("b"));
+        assertEquals(null, keyValueState.get("c"));
+    }
+
+    @Test
+    public void testPutAndDelete() throws Exception {
+        keyValueState.put("a", "1");
+        keyValueState.put("b", "2");
+        assertEquals("1", keyValueState.get("a"));
+        assertEquals("2", keyValueState.get("b"));
+        assertEquals(null, keyValueState.get("c"));
+        assertEquals("1", keyValueState.delete("a"));
+        assertEquals(null, keyValueState.get("a"));
+        assertEquals("2", keyValueState.get("b"));
+        assertEquals(null, keyValueState.get("c"));
+    }
+
+    @Test
+    public void testPrepareCommitRollback() throws Exception {
+        keyValueState.put("a", "1");
+        keyValueState.put("b", "2");
+        keyValueState.prepareCommit(1);
+        keyValueState.put("c", "3");
+        assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+        keyValueState.rollback();
+        assertArrayEquals(new String[]{null, null, null}, getValues());
+        keyValueState.put("a", "1");
+        keyValueState.put("b", "2");
+        keyValueState.prepareCommit(1);
+        keyValueState.commit(1);
+        keyValueState.put("c", "3");
+        assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+        keyValueState.rollback();
+        assertArrayEquals(new String[]{"1", "2", null}, getValues());
+        keyValueState.put("c", "3");
+        assertEquals("2", keyValueState.delete("b"));
+        assertEquals("3", keyValueState.delete("c"));
+        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        keyValueState.prepareCommit(2);
+        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        keyValueState.commit(2);
+        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        keyValueState.put("b", "2");
+        keyValueState.prepareCommit(3);
+        keyValueState.put("c", "3");
+        assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+        keyValueState.rollback();
+        assertArrayEquals(new String[]{"1", null, null}, getValues());
+    }
+
+    private String[] getValues() {
+        return new String[]{
+                keyValueState.get("a"),
+                keyValueState.get("b"),
+                keyValueState.get("c")
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/947149f4/external/storm-redis/src/test/java/org/apache/storm/redis/state/DefaultStateSerializerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/DefaultStateSerializerTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/DefaultStateSerializerTest.java
deleted file mode 100644
index 7346989..0000000
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/DefaultStateSerializerTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.state;
-
-import org.apache.storm.spout.CheckPointState;
-import org.apache.storm.state.DefaultStateSerializer;
-import org.apache.storm.state.Serializer;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * Unit tests for {@link DefaultStateSerializer}
- */
-public class DefaultStateSerializerTest {
-
-    @Test
-    public void testSerializeDeserialize() throws Exception {
-        Serializer<Long> s1 = new DefaultStateSerializer<Long>();
-        byte[] bytes;
-        long val = 100;
-        bytes = s1.serialize(val);
-        assertEquals(val, (long) s1.deserialize(bytes));
-
-        Serializer<CheckPointState> s2 = new DefaultStateSerializer<CheckPointState>();
-        CheckPointState cs = new CheckPointState(100, CheckPointState.State.COMMITTED);
-        bytes = s2.serialize(cs);
-        assertEquals(cs, (CheckPointState) s2.deserialize(bytes));
-
-        List<Class<?>> classesToRegister = new ArrayList<>();
-        classesToRegister.add(CheckPointState.class);
-        Serializer<CheckPointState> s3 = new DefaultStateSerializer<CheckPointState>(classesToRegister);
-        bytes = s2.serialize(cs);
-        assertEquals(cs, (CheckPointState) s2.deserialize(bytes));
-
-    }
-}
\ No newline at end of file