You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/04/28 23:00:37 UTC
[1/3] storm git commit: make user to specific the key and qualifier
of storm-hbase HBaseMapState
Repository: storm
Updated Branches:
refs/heads/master 72dede969 -> 44e9aaf57
make user to specific the key and qualifier of storm-hbase HBaseMapState
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e6f509f8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e6f509f8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e6f509f8
Branch: refs/heads/master
Commit: e6f509f82c30eaed5f2b3a6db5340d1438020adc
Parents: 66be747
Author: dashengju <da...@qq.com>
Authored: Fri Dec 26 11:53:19 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Fri Dec 26 11:53:19 2014 +0800
----------------------------------------------------------------------
.../mapper/SimpleTridentHBaseMapMapper.java | 50 ++++++++++++++++++++
.../trident/mapper/TridentHBaseMapMapper.java | 40 ++++++++++++++++
.../hbase/trident/state/HBaseMapState.java | 45 ++++++++----------
3 files changed, 111 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e6f509f8/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java
new file mode 100644
index 0000000..2262a79
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.trident.mapper;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+public class SimpleTridentHBaseMapMapper implements TridentHBaseMapMapper {
+ private String qualifier;
+
+ public SimpleTridentHBaseMapMapper(String qualifier) {
+ this.qualifier = qualifier;
+ }
+
+ @Override
+ public byte[] rowKey(List<Object> keys) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ for (Object key : keys) {
+ bos.write(String.valueOf(key).getBytes());
+ }
+ bos.close();
+ } catch (IOException e){
+ throw new RuntimeException("IOException creating HBase row key.", e);
+ }
+ return bos.toByteArray();
+ }
+
+ @Override
+ public String qualifier(List<Object> keys) {
+ return this.qualifier;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e6f509f8/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapMapper.java
new file mode 100644
index 0000000..0fb83c5
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.trident.mapper;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface TridentHBaseMapMapper extends Serializable {
+ /**
+ * Given a tuple's grouped key list, return the HBase rowkey.
+ *
+ * @param keys
+ * @return
+ */
+ public byte[] rowKey(List<Object> keys);
+
+ /**
+ * Given a tuple's grouped key list, return the HBase qualifier.
+ *
+ * @param keys
+ * @return
+ */
+ public String qualifier(List<Object> keys);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e6f509f8/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
index 742206b..8053cb2 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
@@ -26,12 +26,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.storm.hbase.security.HBaseSecurityUtil;
+import org.apache.storm.hbase.trident.mapper.TridentHBaseMapMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.state.*;
import storm.trident.state.map.*;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.Serializable;
@@ -102,7 +102,7 @@ public class HBaseMapState<T> implements IBackingMap<T> {
public String configKey = "hbase.config";
public String tableName;
public String columnFamily;
- public String qualifier;
+ public TridentHBaseMapMapper mapMapper;
}
@@ -155,6 +155,10 @@ public class HBaseMapState<T> implements IBackingMap<T> {
if (this.options.serializer == null) {
throw new RuntimeException("Serializer should be specified for type: " + stateType);
}
+
+ if (this.options.mapMapper == null) {
+ throw new RuntimeException("MapMapper should be specified for type: " + stateType);
+ }
}
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -189,17 +193,22 @@ public class HBaseMapState<T> implements IBackingMap<T> {
public List<T> multiGet(List<List<Object>> keys) {
List<Get> gets = new ArrayList<Get>();
for(List<Object> key : keys){
- LOG.info("Partition: {}, GET: {}", this.partitionNum, key);
- Get get = new Get(toRowKey(key));
- get.addColumn(this.options.columnFamily.getBytes(), this.options.qualifier.getBytes());
+ byte[] hbaseKey = this.options.mapMapper.rowKey(key);
+ String qualifier = this.options.mapMapper.qualifier(key);
+
+ LOG.info("Partition: {}, GET: {}", this.partitionNum, new String(hbaseKey));
+ Get get = new Get(hbaseKey);
+ get.addColumn(this.options.columnFamily.getBytes(), qualifier.getBytes());
gets.add(get);
}
List<T> retval = new ArrayList<T>();
try {
Result[] results = this.table.get(gets);
- for (Result result : results) {
- byte[] value = result.getValue(this.options.columnFamily.getBytes(), this.options.qualifier.getBytes());
+ for (int i = 0; i < keys.size(); i++) {
+ String qualifier = this.options.mapMapper.qualifier(keys.get(i));
+ Result result = results[i];
+ byte[] value = result.getValue(this.options.columnFamily.getBytes(), qualifier.getBytes());
if(value != null) {
retval.add(this.serializer.deserialize(value));
} else {
@@ -216,11 +225,13 @@ public class HBaseMapState<T> implements IBackingMap<T> {
public void multiPut(List<List<Object>> keys, List<T> values) {
List<Put> puts = new ArrayList<Put>(keys.size());
for (int i = 0; i < keys.size(); i++) {
- LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, keys.get(i), new String(this.serializer.serialize(values.get(i)))});
- Put put = new Put(toRowKey(keys.get(i)));
+ byte[] hbaseKey = this.options.mapMapper.rowKey(keys.get(i));
+ String qualifier = this.options.mapMapper.qualifier(keys.get(i));
+ LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i)))});
+ Put put = new Put(hbaseKey);
T val = values.get(i);
put.add(this.options.columnFamily.getBytes(),
- this.options.qualifier.getBytes(),
+ qualifier.getBytes(),
this.serializer.serialize(val));
puts.add(put);
@@ -233,18 +244,4 @@ public class HBaseMapState<T> implements IBackingMap<T> {
throw new FailedException("Retries exhaused while writing to HBase", e);
}
}
-
-
- private byte[] toRowKey(List<Object> keys) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try {
- for (Object key : keys) {
- bos.write(String.valueOf(key).getBytes());
- }
- bos.close();
- } catch (IOException e){
- throw new RuntimeException("IOException creating HBase row key.", e);
- }
- return bos.toByteArray();
- }
}
[3/3] storm git commit: Added STORM-607 to Changelog
Posted by bo...@apache.org.
Added STORM-607 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/44e9aaf5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/44e9aaf5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/44e9aaf5
Branch: refs/heads/master
Commit: 44e9aaf57cec7ca7918d4f9a044cbcd84326d6af
Parents: bfe9569
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 28 15:59:57 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Apr 28 15:59:57 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/44e9aaf5/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 59a41bb..900319d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-607: storm-hbase HBaseMapState should support user to customize the hbase-key & hbase-qualifier
* STORM-795: Update the user document for the extlib issue
* STORM-801: Add Travis CI badge to README
* STORM-797: DisruptorQueueTest has some race conditions in it.
[2/3] storm git commit: Merge branch 'branch_for_storm_hbase_extend'
of https://github.com/dashengju/storm into STORM-607
Posted by bo...@apache.org.
Merge branch 'branch_for_storm_hbase_extend' of https://github.com/dashengju/storm into STORM-607
STORM-607: storm-hbase HBaseMapState should support user to customize the hbase-key & hbase-qualifier
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bfe95695
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bfe95695
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bfe95695
Branch: refs/heads/master
Commit: bfe9569517be7643aab7d27e106f2ff0bcc78f4b
Parents: 72dede9 e6f509f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 28 15:52:45 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Apr 28 15:52:45 2015 -0500
----------------------------------------------------------------------
.../mapper/SimpleTridentHBaseMapMapper.java | 50 ++++++++++++++++++++
.../trident/mapper/TridentHBaseMapMapper.java | 40 ++++++++++++++++
.../hbase/trident/state/HBaseMapState.java | 45 ++++++++----------
3 files changed, 111 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bfe95695/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
----------------------------------------------------------------------
diff --cc external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
index a040e51,8053cb2..99ce4bf
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
@@@ -231,22 -242,6 +242,8 @@@ public class HBaseMapState<T> implement
throw new FailedException("Interrupted while writing to HBase", e);
} catch (RetriesExhaustedWithDetailsException e) {
throw new FailedException("Retries exhaused while writing to HBase", e);
+ } catch (IOException e) {
+ throw new FailedException("IOException while writing to HBase", e);
}
}
-
-
- private byte[] toRowKey(List<Object> keys) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try {
- for (Object key : keys) {
- bos.write(String.valueOf(key).getBytes());
- }
- bos.close();
- } catch (IOException e){
- throw new RuntimeException("IOException creating HBase row key.", e);
- }
- return bos.toByteArray();
- }
}