You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/10/02 01:07:34 UTC

[14/26] git commit: First pass at an HBase-backed MapState

First pass at an HBase-backed MapState


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/20c4565d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/20c4565d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/20c4565d

Branch: refs/heads/master
Commit: 20c4565dbdbbb916d4143a9262c0869c566221bc
Parents: baffe4a
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Apr 23 17:17:06 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Apr 23 17:17:06 2014 -0400

----------------------------------------------------------------------
 pom.xml                                         |   8 +-
 .../storm/hbase/bolt/mapper/HBaseMapper.java    |   2 +-
 .../hbase/bolt/mapper/SimpleHBaseMapper.java    |   2 +-
 .../mapper/SimpleTridentHBaseMapper.java        |  89 +++++++
 .../trident/mapper/TridentHBaseMapper.java      |  31 +++
 .../hbase/trident/state/HBaseMapState.java      | 234 +++++++++++++++++++
 6 files changed, 362 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/20c4565d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 43113ca..9ac0761 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,17 +48,21 @@
         </developer>
     </developers>
 
+    <properties>
+        <hbase.version>0.98.1-hadoop2</hbase.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
-            <version>0.9.1-incubating</version>
+            <version>0.9.2-incubating-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
-            <version>0.98.0-hadoop2</version>
+            <version>${hbase.version}</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/20c4565d/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java b/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
index 1763e1b..626ce96 100644
--- a/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
+++ b/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
@@ -24,7 +24,7 @@ import org.apache.storm.hbase.common.ColumnList;
 import java.io.Serializable;
 
 /**
- * Maps a <code>backtype.storm.tuple.Tupe</code> object
+ * Maps a <code>backtype.storm.tuple.Tuple</code> object
  * to a row in an HBase table.
  */
 public interface HBaseMapper extends Serializable {

http://git-wip-us.apache.org/repos/asf/storm/blob/20c4565d/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java b/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
index 7624e51..da0efd4 100644
--- a/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
+++ b/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
@@ -60,7 +60,7 @@ public class SimpleHBaseMapper implements HBaseMapper {
         return this;
     }
 
-//    public SimpleHBaseMapper withTimestampField(String timestampField){
+//    public SimpleTridentHBaseMapper withTimestampField(String timestampField){
 //        this.timestampField = timestampField;
 //        return this;
 //    }

http://git-wip-us.apache.org/repos/asf/storm/blob/20c4565d/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java b/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
new file mode 100644
index 0000000..be3ab95
--- /dev/null
+++ b/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.trident.mapper;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
+import org.apache.storm.hbase.common.ColumnList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.tuple.TridentTuple;
+
+import static org.apache.storm.hbase.common.Utils.toBytes;
+import static org.apache.storm.hbase.common.Utils.toLong;
+
+/**
+ *
+ */
+public class SimpleTridentHBaseMapper implements TridentHBaseMapper {
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleTridentHBaseMapper.class);
+    
+    private String rowKeyField;
+    private byte[] columnFamily;
+    private Fields columnFields;
+    private Fields counterFields;
+
+    public SimpleTridentHBaseMapper(){
+    }
+
+
+    public SimpleTridentHBaseMapper withRowKeyField(String rowKeyField){
+        this.rowKeyField = rowKeyField;
+        return this;
+    }
+
+    public SimpleTridentHBaseMapper withColumnFields(Fields columnFields){
+        this.columnFields = columnFields;
+        return this;
+    }
+
+    public SimpleTridentHBaseMapper withCounterFields(Fields counterFields){
+        this.counterFields = counterFields;
+        return this;
+    }
+
+    public SimpleTridentHBaseMapper withColumnFamily(String columnFamily){
+        this.columnFamily = columnFamily.getBytes();
+        return this;
+    }
+
+
+    @Override
+    public byte[] rowKey(TridentTuple tuple) {
+        Object objVal = tuple.getValueByField(this.rowKeyField);
+        return toBytes(objVal);
+    }
+
+    @Override
+    public ColumnList columns(TridentTuple tuple) {
+        ColumnList cols = new ColumnList();
+        if(this.columnFields != null){
+            // TODO timestamps
+            for(String field : this.columnFields){
+                cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));
+            }
+        }
+        if(this.counterFields != null){
+            for(String field : this.counterFields){
+                cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));
+            }
+        }
+        return cols;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/20c4565d/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java b/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
new file mode 100644
index 0000000..66bf65e
--- /dev/null
+++ b/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
@@ -0,0 +1,31 @@
+package org.apache.storm.hbase.trident.mapper;
+
+
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.hbase.common.ColumnList;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+/**
+ * Maps a <code>storm.trident.tuple.TridentTuple</code> object
+ * to a row in an HBase table.
+ */
+public interface TridentHBaseMapper extends Serializable {
+
+
+        /**
+         * Given a tuple, return the HBase rowkey.
+         *
+         * @param tuple
+         * @return
+         */
+        byte[] rowKey(TridentTuple tuple);
+
+        /**
+         * Given a tuple, return a list of HBase columns to insert.
+         *
+         * @param tuple
+         * @return
+         */
+        ColumnList columns(TridentTuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/20c4565d/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java b/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
new file mode 100644
index 0000000..832b8df
--- /dev/null
+++ b/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
@@ -0,0 +1,234 @@
+package org.apache.storm.hbase.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.storm.hbase.security.HBaseSecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.state.*;
+import storm.trident.state.map.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.Serializable;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class HBaseMapState<T> implements IBackingMap<T> {
+    private static Logger LOG = LoggerFactory.getLogger(HBaseMapState.class);
+
+    private int partitionNum;
+
+
+    @SuppressWarnings("rawtypes")
+    private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps.newHashMap();
+
+    static {
+        DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer());
+        DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer());
+        DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer());
+    }
+
+    private Options<T> options;
+    private Serializer<T> serializer;
+    private HTable table;
+
+    public HBaseMapState(final Options<T> options, Map map, int partitionNum) {
+        this.options = options;
+        this.serializer = options.serializer;
+        this.partitionNum = partitionNum;
+
+        final Configuration hbConfig = HBaseConfiguration.create();
+        Map<String, Object> conf = (Map<String, Object>)map.get(options.configKey);
+        if(conf == null){
+            LOG.info("HBase configuration not found using key '" + options.configKey + "'");
+            LOG.info("Using HBase config from first hbase-site.xml found on classpath.");
+        } else {
+            if (conf.get("hbase.rootdir") == null) {
+                LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults.");
+            }
+            for (String key : conf.keySet()) {
+                hbConfig.set(key, String.valueOf(map.get(key)));
+            }
+        }
+
+        try{
+            UserProvider provider = HBaseSecurityUtil.login(map, hbConfig);
+            this.table = provider.getCurrent().getUGI().doAs(new PrivilegedExceptionAction<HTable>() {
+                @Override
+                public HTable run() throws IOException {
+                    return new HTable(hbConfig, options.tableName);
+                }
+            });
+        } catch(Exception e){
+            throw new RuntimeException("HBase bolt preparation failed: " + e.getMessage(), e);
+        }
+
+    }
+
+
+    public static class Options<T> implements Serializable {
+
+        public Serializer<T> serializer = null;
+        public int cacheSize = 5000;
+        public String globalKey = "$HBASE_STATE_GLOBAL$";
+        public String configKey = "hbase.config";
+        public String tableName;
+        public String columnFamily;
+        public String qualifier;
+
+    }
+
+
+    @SuppressWarnings("rawtypes")
+    public static StateFactory opaque() {
+        Options<OpaqueValue> options = new Options<OpaqueValue>();
+        return opaque(options);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static StateFactory opaque(Options<OpaqueValue> opts) {
+
+        return new Factory(StateType.OPAQUE, opts);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static StateFactory transactional() {
+        Options<TransactionalValue> options = new Options<TransactionalValue>();
+        return transactional(options);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static StateFactory transactional(Options<TransactionalValue> opts) {
+        return new Factory(StateType.TRANSACTIONAL, opts);
+    }
+
+    public static StateFactory nonTransactional() {
+        Options<Object> options = new Options<Object>();
+        return nonTransactional(options);
+    }
+
+    public static StateFactory nonTransactional(Options<Object> opts) {
+        return new Factory(StateType.NON_TRANSACTIONAL, opts);
+    }
+
+
+    protected static class Factory implements StateFactory {
+        private StateType stateType;
+        private Options options;
+
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        public Factory(StateType stateType, Options options) {
+            this.stateType = stateType;
+            this.options = options;
+
+            if (this.options.serializer == null) {
+                this.options.serializer = DEFAULT_SERIALZERS.get(stateType);
+            }
+
+            if (this.options.serializer == null) {
+                throw new RuntimeException("Serializer should be specified for type: " + stateType);
+            }
+        }
+
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+            LOG.info("Preparing HBase State for partition {} of {}.", partitionIndex + 1, numPartitions);
+            IBackingMap state = new HBaseMapState(options, conf, partitionIndex);
+
+            if(options.cacheSize > 0) {
+                state = new CachedMap(state, options.cacheSize);
+            }
+
+            MapState mapState;
+            switch (stateType) {
+                case NON_TRANSACTIONAL:
+                    mapState = NonTransactionalMap.build(state);
+                    break;
+                case OPAQUE:
+                    mapState = OpaqueMap.build(state);
+                    break;
+                case TRANSACTIONAL:
+                    mapState = TransactionalMap.build(state);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown state type: " + stateType);
+            }
+            return new SnapshottableMap(mapState, new Values(options.globalKey));
+        }
+
+    }
+
+    @Override
+    public List<T> multiGet(List<List<Object>> keys) {
+        List<Get> gets = new ArrayList<Get>();
+        for(List<Object> key : keys){
+            LOG.info("Partition: {}, GET: {}", this.partitionNum, key);
+            Get get = new Get(toRowKey(key));
+            get.addColumn(this.options.columnFamily.getBytes(), this.options.qualifier.getBytes());
+            gets.add(get);
+        }
+
+        List<T> retval = new ArrayList<T>();
+        try {
+            Result[] results = this.table.get(gets);
+            for (Result result : results) {
+                byte[] value = result.getValue(this.options.columnFamily.getBytes(), this.options.qualifier.getBytes());
+                if(value != null) {
+                    retval.add(this.serializer.deserialize(value));
+                } else {
+                    retval.add(null);
+                }
+            }
+        } catch(IOException e){
+            throw new FailedException("IOException while reading from HBase.", e);
+        }
+        return retval;
+    }
+
+    @Override
+    public void multiPut(List<List<Object>> keys, List<T> values) {
+        List<Put> puts = new ArrayList<Put>(keys.size());
+        for (int i = 0; i < keys.size(); i++) {
+            LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, keys.get(i), new String(this.serializer.serialize(values.get(i)))});
+            Put put = new Put(toRowKey(keys.get(i)));
+            T val = values.get(i);
+            put.add(this.options.columnFamily.getBytes(),
+                    this.options.qualifier.getBytes(),
+                    this.serializer.serialize(val));
+
+            puts.add(put);
+        }
+        try {
+            this.table.put(puts);
+        } catch (InterruptedIOException e) {
+            throw new FailedException("Interrupted while writing to HBase", e);
+        } catch (RetriesExhaustedWithDetailsException e) {
+            throw new FailedException("Retries exhaused while writing to HBase", e);
+        }
+    }
+
+
+    private byte[] toRowKey(List<Object> keys) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try {
+            for (Object key : keys) {
+                bos.write(String.valueOf(key).getBytes());
+            }
+            bos.close();
+        } catch (IOException e){
+            throw new RuntimeException("IOException creating HBase row key.", e);
+        }
+        return bos.toByteArray();
+    }
+}