You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/10/02 01:07:34 UTC
[14/26] git commit: First pass at an HBase-backed MapState
First pass at an HBase-backed MapState
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/20c4565d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/20c4565d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/20c4565d
Branch: refs/heads/master
Commit: 20c4565dbdbbb916d4143a9262c0869c566221bc
Parents: baffe4a
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Apr 23 17:17:06 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Apr 23 17:17:06 2014 -0400
----------------------------------------------------------------------
pom.xml | 8 +-
.../storm/hbase/bolt/mapper/HBaseMapper.java | 2 +-
.../hbase/bolt/mapper/SimpleHBaseMapper.java | 2 +-
.../mapper/SimpleTridentHBaseMapper.java | 89 +++++++
.../trident/mapper/TridentHBaseMapper.java | 31 +++
.../hbase/trident/state/HBaseMapState.java | 234 +++++++++++++++++++
6 files changed, 362 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/20c4565d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 43113ca..9ac0761 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,17 +48,21 @@
</developer>
</developers>
+ <properties>
+ <hbase.version>0.98.1-hadoop2</hbase.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
- <version>0.9.1-incubating</version>
+ <version>0.9.2-incubating-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
- <version>0.98.0-hadoop2</version>
+ <version>${hbase.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/20c4565d/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java b/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
index 1763e1b..626ce96 100644
--- a/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
+++ b/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
@@ -24,7 +24,7 @@ import org.apache.storm.hbase.common.ColumnList;
import java.io.Serializable;
/**
- * Maps a <code>backtype.storm.tuple.Tupe</code> object
+ * Maps a <code>backtype.storm.tuple.Tuple</code> object
* to a row in an HBase table.
*/
public interface HBaseMapper extends Serializable {
http://git-wip-us.apache.org/repos/asf/storm/blob/20c4565d/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java b/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
index 7624e51..da0efd4 100644
--- a/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
+++ b/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
@@ -60,7 +60,7 @@ public class SimpleHBaseMapper implements HBaseMapper {
return this;
}
-// public SimpleHBaseMapper withTimestampField(String timestampField){
+// public SimpleTridentHBaseMapper withTimestampField(String timestampField){
// this.timestampField = timestampField;
// return this;
// }
http://git-wip-us.apache.org/repos/asf/storm/blob/20c4565d/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java b/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
new file mode 100644
index 0000000..be3ab95
--- /dev/null
+++ b/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.trident.mapper;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
+import org.apache.storm.hbase.common.ColumnList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.tuple.TridentTuple;
+
+import static org.apache.storm.hbase.common.Utils.toBytes;
+import static org.apache.storm.hbase.common.Utils.toLong;
+
+/**
+ *
+ */
+public class SimpleTridentHBaseMapper implements TridentHBaseMapper {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleTridentHBaseMapper.class);
+
+ private String rowKeyField;
+ private byte[] columnFamily;
+ private Fields columnFields;
+ private Fields counterFields;
+
+ public SimpleTridentHBaseMapper(){
+ }
+
+
+ public SimpleTridentHBaseMapper withRowKeyField(String rowKeyField){
+ this.rowKeyField = rowKeyField;
+ return this;
+ }
+
+ public SimpleTridentHBaseMapper withColumnFields(Fields columnFields){
+ this.columnFields = columnFields;
+ return this;
+ }
+
+ public SimpleTridentHBaseMapper withCounterFields(Fields counterFields){
+ this.counterFields = counterFields;
+ return this;
+ }
+
+ public SimpleTridentHBaseMapper withColumnFamily(String columnFamily){
+ this.columnFamily = columnFamily.getBytes();
+ return this;
+ }
+
+
+ @Override
+ public byte[] rowKey(TridentTuple tuple) {
+ Object objVal = tuple.getValueByField(this.rowKeyField);
+ return toBytes(objVal);
+ }
+
+ @Override
+ public ColumnList columns(TridentTuple tuple) {
+ ColumnList cols = new ColumnList();
+ if(this.columnFields != null){
+ // TODO timestamps
+ for(String field : this.columnFields){
+ cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));
+ }
+ }
+ if(this.counterFields != null){
+ for(String field : this.counterFields){
+ cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));
+ }
+ }
+ return cols;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/20c4565d/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java b/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
new file mode 100644
index 0000000..66bf65e
--- /dev/null
+++ b/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
@@ -0,0 +1,31 @@
+package org.apache.storm.hbase.trident.mapper;
+
+
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.hbase.common.ColumnList;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+/**
+ * Maps a <code>storm.trident.tuple.TridentTuple</code> object
+ * to a row in an HBase table.
+ */
+public interface TridentHBaseMapper extends Serializable {
+
+
+ /**
+ * Given a tuple, return the HBase rowkey.
+ *
+ * @param tuple
+ * @return
+ */
+ byte[] rowKey(TridentTuple tuple);
+
+ /**
+ * Given a tuple, return a list of HBase columns to insert.
+ *
+ * @param tuple
+ * @return
+ */
+ ColumnList columns(TridentTuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/20c4565d/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java b/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
new file mode 100644
index 0000000..832b8df
--- /dev/null
+++ b/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
@@ -0,0 +1,234 @@
+package org.apache.storm.hbase.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.storm.hbase.security.HBaseSecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.state.*;
+import storm.trident.state.map.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.Serializable;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class HBaseMapState<T> implements IBackingMap<T> {
+ private static Logger LOG = LoggerFactory.getLogger(HBaseMapState.class);
+
+ private int partitionNum;
+
+
+ @SuppressWarnings("rawtypes")
+ private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps.newHashMap();
+
+ static {
+ DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer());
+ DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer());
+ DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer());
+ }
+
+ private Options<T> options;
+ private Serializer<T> serializer;
+ private HTable table;
+
+ public HBaseMapState(final Options<T> options, Map map, int partitionNum) {
+ this.options = options;
+ this.serializer = options.serializer;
+ this.partitionNum = partitionNum;
+
+ final Configuration hbConfig = HBaseConfiguration.create();
+ Map<String, Object> conf = (Map<String, Object>)map.get(options.configKey);
+ if(conf == null){
+ LOG.info("HBase configuration not found using key '" + options.configKey + "'");
+ LOG.info("Using HBase config from first hbase-site.xml found on classpath.");
+ } else {
+ if (conf.get("hbase.rootdir") == null) {
+ LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults.");
+ }
+ for (String key : conf.keySet()) {
+ hbConfig.set(key, String.valueOf(map.get(key)));
+ }
+ }
+
+ try{
+ UserProvider provider = HBaseSecurityUtil.login(map, hbConfig);
+ this.table = provider.getCurrent().getUGI().doAs(new PrivilegedExceptionAction<HTable>() {
+ @Override
+ public HTable run() throws IOException {
+ return new HTable(hbConfig, options.tableName);
+ }
+ });
+ } catch(Exception e){
+ throw new RuntimeException("HBase bolt preparation failed: " + e.getMessage(), e);
+ }
+
+ }
+
+
+ public static class Options<T> implements Serializable {
+
+ public Serializer<T> serializer = null;
+ public int cacheSize = 5000;
+ public String globalKey = "$HBASE_STATE_GLOBAL$";
+ public String configKey = "hbase.config";
+ public String tableName;
+ public String columnFamily;
+ public String qualifier;
+
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ public static StateFactory opaque() {
+ Options<OpaqueValue> options = new Options<OpaqueValue>();
+ return opaque(options);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static StateFactory opaque(Options<OpaqueValue> opts) {
+
+ return new Factory(StateType.OPAQUE, opts);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static StateFactory transactional() {
+ Options<TransactionalValue> options = new Options<TransactionalValue>();
+ return transactional(options);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static StateFactory transactional(Options<TransactionalValue> opts) {
+ return new Factory(StateType.TRANSACTIONAL, opts);
+ }
+
+ public static StateFactory nonTransactional() {
+ Options<Object> options = new Options<Object>();
+ return nonTransactional(options);
+ }
+
+ public static StateFactory nonTransactional(Options<Object> opts) {
+ return new Factory(StateType.NON_TRANSACTIONAL, opts);
+ }
+
+
+ protected static class Factory implements StateFactory {
+ private StateType stateType;
+ private Options options;
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public Factory(StateType stateType, Options options) {
+ this.stateType = stateType;
+ this.options = options;
+
+ if (this.options.serializer == null) {
+ this.options.serializer = DEFAULT_SERIALZERS.get(stateType);
+ }
+
+ if (this.options.serializer == null) {
+ throw new RuntimeException("Serializer should be specified for type: " + stateType);
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ LOG.info("Preparing HBase State for partition {} of {}.", partitionIndex + 1, numPartitions);
+ IBackingMap state = new HBaseMapState(options, conf, partitionIndex);
+
+ if(options.cacheSize > 0) {
+ state = new CachedMap(state, options.cacheSize);
+ }
+
+ MapState mapState;
+ switch (stateType) {
+ case NON_TRANSACTIONAL:
+ mapState = NonTransactionalMap.build(state);
+ break;
+ case OPAQUE:
+ mapState = OpaqueMap.build(state);
+ break;
+ case TRANSACTIONAL:
+ mapState = TransactionalMap.build(state);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown state type: " + stateType);
+ }
+ return new SnapshottableMap(mapState, new Values(options.globalKey));
+ }
+
+ }
+
+ @Override
+ public List<T> multiGet(List<List<Object>> keys) {
+ List<Get> gets = new ArrayList<Get>();
+ for(List<Object> key : keys){
+ LOG.info("Partition: {}, GET: {}", this.partitionNum, key);
+ Get get = new Get(toRowKey(key));
+ get.addColumn(this.options.columnFamily.getBytes(), this.options.qualifier.getBytes());
+ gets.add(get);
+ }
+
+ List<T> retval = new ArrayList<T>();
+ try {
+ Result[] results = this.table.get(gets);
+ for (Result result : results) {
+ byte[] value = result.getValue(this.options.columnFamily.getBytes(), this.options.qualifier.getBytes());
+ if(value != null) {
+ retval.add(this.serializer.deserialize(value));
+ } else {
+ retval.add(null);
+ }
+ }
+ } catch(IOException e){
+ throw new FailedException("IOException while reading from HBase.", e);
+ }
+ return retval;
+ }
+
+ @Override
+ public void multiPut(List<List<Object>> keys, List<T> values) {
+ List<Put> puts = new ArrayList<Put>(keys.size());
+ for (int i = 0; i < keys.size(); i++) {
+ LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, keys.get(i), new String(this.serializer.serialize(values.get(i)))});
+ Put put = new Put(toRowKey(keys.get(i)));
+ T val = values.get(i);
+ put.add(this.options.columnFamily.getBytes(),
+ this.options.qualifier.getBytes(),
+ this.serializer.serialize(val));
+
+ puts.add(put);
+ }
+ try {
+ this.table.put(puts);
+ } catch (InterruptedIOException e) {
+ throw new FailedException("Interrupted while writing to HBase", e);
+ } catch (RetriesExhaustedWithDetailsException e) {
+ throw new FailedException("Retries exhaused while writing to HBase", e);
+ }
+ }
+
+
+ private byte[] toRowKey(List<Object> keys) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ for (Object key : keys) {
+ bos.write(String.valueOf(key).getBytes());
+ }
+ bos.close();
+ } catch (IOException e){
+ throw new RuntimeException("IOException creating HBase row key.", e);
+ }
+ return bos.toByteArray();
+ }
+}