You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/04/28 23:00:37 UTC

[1/3] storm git commit: make user to specific the key and qualifier of storm-hbase HBaseMapState

Repository: storm
Updated Branches:
  refs/heads/master 72dede969 -> 44e9aaf57


make user to specific the key and qualifier of storm-hbase HBaseMapState


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e6f509f8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e6f509f8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e6f509f8

Branch: refs/heads/master
Commit: e6f509f82c30eaed5f2b3a6db5340d1438020adc
Parents: 66be747
Author: dashengju <da...@qq.com>
Authored: Fri Dec 26 11:53:19 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Fri Dec 26 11:53:19 2014 +0800

----------------------------------------------------------------------
 .../mapper/SimpleTridentHBaseMapMapper.java     | 50 ++++++++++++++++++++
 .../trident/mapper/TridentHBaseMapMapper.java   | 40 ++++++++++++++++
 .../hbase/trident/state/HBaseMapState.java      | 45 ++++++++----------
 3 files changed, 111 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e6f509f8/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java
new file mode 100644
index 0000000..2262a79
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.trident.mapper;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+public class SimpleTridentHBaseMapMapper implements TridentHBaseMapMapper {
+    private String qualifier;
+
+    public SimpleTridentHBaseMapMapper(String qualifier) {
+        this.qualifier = qualifier;
+    }
+
+    @Override
+    public byte[] rowKey(List<Object> keys) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try {
+            for (Object key : keys) {
+                bos.write(String.valueOf(key).getBytes());
+            }
+            bos.close();
+        } catch (IOException e){
+            throw new RuntimeException("IOException creating HBase row key.", e);
+        }
+        return bos.toByteArray();
+    }
+
+    @Override
+    public String qualifier(List<Object> keys) {
+        return this.qualifier;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e6f509f8/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapMapper.java
new file mode 100644
index 0000000..0fb83c5
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.trident.mapper;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface TridentHBaseMapMapper extends Serializable {
+    /**
+     * Given a tuple's grouped key list, return the HBase rowkey.
+     *
+     * @param keys
+     * @return
+     */
+    public byte[] rowKey(List<Object> keys);
+
+    /**
+     * Given a tuple's grouped key list, return the HBase qualifier.
+     *
+     * @param keys
+     * @return
+     */
+    public String qualifier(List<Object> keys);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e6f509f8/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
index 742206b..8053cb2 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
@@ -26,12 +26,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.storm.hbase.security.HBaseSecurityUtil;
+import org.apache.storm.hbase.trident.mapper.TridentHBaseMapMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.trident.state.*;
 import storm.trident.state.map.*;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.Serializable;
@@ -102,7 +102,7 @@ public class HBaseMapState<T> implements IBackingMap<T> {
         public String configKey = "hbase.config";
         public String tableName;
         public String columnFamily;
-        public String qualifier;
+        public TridentHBaseMapMapper mapMapper;
     }
 
 
@@ -155,6 +155,10 @@ public class HBaseMapState<T> implements IBackingMap<T> {
             if (this.options.serializer == null) {
                 throw new RuntimeException("Serializer should be specified for type: " + stateType);
             }
+
+            if (this.options.mapMapper == null) {
+                throw new RuntimeException("MapMapper should be specified for type: " + stateType);
+            }
         }
 
         @SuppressWarnings({"rawtypes", "unchecked"})
@@ -189,17 +193,22 @@ public class HBaseMapState<T> implements IBackingMap<T> {
     public List<T> multiGet(List<List<Object>> keys) {
         List<Get> gets = new ArrayList<Get>();
         for(List<Object> key : keys){
-            LOG.info("Partition: {}, GET: {}", this.partitionNum, key);
-            Get get = new Get(toRowKey(key));
-            get.addColumn(this.options.columnFamily.getBytes(), this.options.qualifier.getBytes());
+            byte[] hbaseKey = this.options.mapMapper.rowKey(key);
+            String qualifier = this.options.mapMapper.qualifier(key);
+
+            LOG.info("Partition: {}, GET: {}", this.partitionNum, new String(hbaseKey));
+            Get get = new Get(hbaseKey);
+            get.addColumn(this.options.columnFamily.getBytes(), qualifier.getBytes());
             gets.add(get);
         }
 
         List<T> retval = new ArrayList<T>();
         try {
             Result[] results = this.table.get(gets);
-            for (Result result : results) {
-                byte[] value = result.getValue(this.options.columnFamily.getBytes(), this.options.qualifier.getBytes());
+            for (int i = 0; i < keys.size(); i++) {
+                String qualifier = this.options.mapMapper.qualifier(keys.get(i));
+                Result result = results[i];
+                byte[] value = result.getValue(this.options.columnFamily.getBytes(), qualifier.getBytes());
                 if(value != null) {
                     retval.add(this.serializer.deserialize(value));
                 } else {
@@ -216,11 +225,13 @@ public class HBaseMapState<T> implements IBackingMap<T> {
     public void multiPut(List<List<Object>> keys, List<T> values) {
         List<Put> puts = new ArrayList<Put>(keys.size());
         for (int i = 0; i < keys.size(); i++) {
-            LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, keys.get(i), new String(this.serializer.serialize(values.get(i)))});
-            Put put = new Put(toRowKey(keys.get(i)));
+            byte[] hbaseKey = this.options.mapMapper.rowKey(keys.get(i));
+            String qualifier = this.options.mapMapper.qualifier(keys.get(i));
+            LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i)))});
+            Put put = new Put(hbaseKey);
             T val = values.get(i);
             put.add(this.options.columnFamily.getBytes(),
-                    this.options.qualifier.getBytes(),
+                    qualifier.getBytes(),
                     this.serializer.serialize(val));
 
             puts.add(put);
@@ -233,18 +244,4 @@ public class HBaseMapState<T> implements IBackingMap<T> {
             throw new FailedException("Retries exhaused while writing to HBase", e);
         }
     }
-
-
-    private byte[] toRowKey(List<Object> keys) {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        try {
-            for (Object key : keys) {
-                bos.write(String.valueOf(key).getBytes());
-            }
-            bos.close();
-        } catch (IOException e){
-            throw new RuntimeException("IOException creating HBase row key.", e);
-        }
-        return bos.toByteArray();
-    }
 }


[3/3] storm git commit: Added STORM-607 to Changelog

Posted by bo...@apache.org.
Added STORM-607 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/44e9aaf5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/44e9aaf5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/44e9aaf5

Branch: refs/heads/master
Commit: 44e9aaf57cec7ca7918d4f9a044cbcd84326d6af
Parents: bfe9569
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 28 15:59:57 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Apr 28 15:59:57 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/44e9aaf5/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 59a41bb..900319d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-607: storm-hbase HBaseMapState should support user to customize the hbase-key & hbase-qualifier
  * STORM-795: Update the user document for the extlib issue
  * STORM-801: Add Travis CI badge to README
  * STORM-797: DisruptorQueueTest has some race conditions in it.


[2/3] storm git commit: Merge branch 'branch_for_storm_hbase_extend' of https://github.com/dashengju/storm into STORM-607

Posted by bo...@apache.org.
Merge branch 'branch_for_storm_hbase_extend' of https://github.com/dashengju/storm into STORM-607

STORM-607: storm-hbase HBaseMapState should support user to customize the hbase-key & hbase-qualifier


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bfe95695
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bfe95695
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bfe95695

Branch: refs/heads/master
Commit: bfe9569517be7643aab7d27e106f2ff0bcc78f4b
Parents: 72dede9 e6f509f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 28 15:52:45 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Apr 28 15:52:45 2015 -0500

----------------------------------------------------------------------
 .../mapper/SimpleTridentHBaseMapMapper.java     | 50 ++++++++++++++++++++
 .../trident/mapper/TridentHBaseMapMapper.java   | 40 ++++++++++++++++
 .../hbase/trident/state/HBaseMapState.java      | 45 ++++++++----------
 3 files changed, 111 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bfe95695/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
----------------------------------------------------------------------
diff --cc external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
index a040e51,8053cb2..99ce4bf
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
@@@ -231,22 -242,6 +242,8 @@@ public class HBaseMapState<T> implement
              throw new FailedException("Interrupted while writing to HBase", e);
          } catch (RetriesExhaustedWithDetailsException e) {
              throw new FailedException("Retries exhaused while writing to HBase", e);
 +        } catch (IOException e) {
 +            throw new FailedException("IOException while writing to HBase", e);
          }
      }
- 
- 
-     private byte[] toRowKey(List<Object> keys) {
-         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-         try {
-             for (Object key : keys) {
-                 bos.write(String.valueOf(key).getBytes());
-             }
-             bos.close();
-         } catch (IOException e){
-             throw new RuntimeException("IOException creating HBase row key.", e);
-         }
-         return bos.toByteArray();
-     }
  }