You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2013/07/24 16:04:24 UTC

svn commit: r1506563 - in /hive/trunk/hbase-handler/src: java/org/apache/hadoop/hive/hbase/ test/org/apache/hadoop/hive/hbase/

Author: brock
Date: Wed Jul 24 14:04:24 2013
New Revision: 1506563

URL: http://svn.apache.org/r1506563
Log:
HIVE-3725: Add support for pulling HBase columns with prefixes (Swarnim Kulkarni via Brock Noland)

Modified:
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
    hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=1506563&r1=1506562&r2=1506563&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Wed Jul 24 14:04:24 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.ByteStream;
-import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -69,11 +68,16 @@ public class HBaseSerDe extends Abstract
   public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
   public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
   public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";
+  
+  /** Determines whether a regex matching should be done on the columns or not. Defaults to true. 
+   *  <strong>WARNING: Note that currently this only supports the suffix wildcard .*</strong> **/
+  public static final String HBASE_COLUMNS_REGEX_MATCHING = "hbase.columns.mapping.regex.matching";
 
   public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
 
   private ObjectInspector cachedObjectInspector;
   private String hbaseColumnsMapping;
+  private boolean doColumnRegexMatching;
   private List<ColumnMapping> columnsMapping;
   private SerDeParameters serdeParams;
   private boolean useJSONSerialize;
@@ -148,6 +152,21 @@ public class HBaseSerDe extends Abstract
    */
   public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec)
       throws SerDeException {
+    return parseColumnsMapping(columnsMappingSpec, true);
+  }
+
+  /**
+   * Parses the HBase columns mapping specifier to identify the column families, qualifiers
+   * and also caches the byte arrays corresponding to them. One of the Hive table
+   * columns maps to the HBase row key, by default the first column.
+   *
+   * @param columnsMappingSpec string hbase.columns.mapping specified when creating table
+   * @param doColumnRegexMatching whether to do a regex matching on the columns or not
+   * @return List<ColumnMapping> which contains the column mapping information by position
+   * @throws SerDeException
+   */
+  public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec, boolean doColumnRegexMatching)
+      throws SerDeException {
 
     if (columnsMappingSpec == null) {
       throw new SerDeException("Error: hbase.columns.mapping missing for this HBase table.");
@@ -193,8 +212,21 @@ public class HBaseSerDe extends Abstract
         columnMapping.hbaseRowKey = false;
 
         if (parts.length == 2) {
-          columnMapping.qualifierName = parts[1];
-          columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]);
+
+          if (doColumnRegexMatching && parts[1].endsWith(".*")) {
+            // we have a prefix with a wildcard
+            columnMapping.qualifierPrefix = parts[1].substring(0, parts[1].length() - 2);
+            columnMapping.qualifierPrefixBytes = Bytes.toBytes(columnMapping.qualifierPrefix);
+            // we weren't provided any actual qualifier name. Set these to
+            // null.
+            columnMapping.qualifierName = null;
+            columnMapping.qualifierNameBytes = null;
+          } else {
+            // set the regular provided qualifier names
+            columnMapping.qualifierName = parts[1];
+            columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]);
+            ;
+          }
         } else {
           columnMapping.qualifierName = null;
           columnMapping.qualifierNameBytes = null;
@@ -413,6 +445,8 @@ public class HBaseSerDe extends Abstract
     List<Boolean> binaryStorage;
     boolean hbaseRowKey;
     String mappingSpec;
+    String qualifierPrefix;
+    byte[] qualifierPrefixBytes;
   }
 
   private void initHBaseSerDeParameters(
@@ -424,8 +458,10 @@ public class HBaseSerDe extends Abstract
     String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
     putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1"));
 
+    doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true"));
+
     // Parse and initialize the HBase columns mapping
-    columnsMapping = parseColumnsMapping(hbaseColumnsMapping);
+    columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
 
     // Build the type property string if not supplied
     if (columnTypeProperty == null) {
@@ -798,6 +834,7 @@ public class HBaseSerDe extends Abstract
     return columnsMapping.get(colPos).binaryStorage;
   }
 
+  @Override
   public SerDeStats getSerDeStats() {
     // no support for statistics
     return null;

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1506563&r1=1506562&r2=1506563&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Wed Jul 24 14:04:24 2013
@@ -39,9 +39,9 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
 import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
 import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
@@ -279,6 +279,8 @@ public class HBaseStorageHandler extends
     jobProperties.put(
       HBaseSerDe.HBASE_COLUMNS_MAPPING,
       tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING));
+    jobProperties.put(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING,
+        tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, "true"));
     jobProperties.put(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE,
       tableProperties.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE,"string"));
     String scanCache = tableProperties.getProperty(HBaseSerDe.HBASE_SCAN_CACHE);

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=1506563&r1=1506562&r2=1506563&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Wed Jul 24 14:04:24 2013
@@ -91,11 +91,12 @@ public class HiveHBaseTableInputFormat e
     String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
     setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)));
     String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+    boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
     List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
     List<ColumnMapping> columnsMapping = null;
 
     try {
-      columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+      columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
     } catch (SerDeException e) {
       throw new IOException(e);
     }
@@ -434,6 +435,7 @@ public class HiveHBaseTableInputFormat e
     String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
     setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)));
     String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+    boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
 
     if (hbaseColumnsMapping == null) {
       throw new IOException("hbase.columns.mapping required for HBase Table.");
@@ -441,7 +443,7 @@ public class HiveHBaseTableInputFormat e
 
     List<ColumnMapping> columnsMapping = null;
     try {
-      columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+      columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching);
     } catch (SerDeException e) {
       throw new IOException(e);
     }

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java?rev=1506563&r1=1506562&r2=1506563&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java Wed Jul 24 14:04:24 2013
@@ -21,10 +21,11 @@ package org.apache.hadoop.hive.hbase;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Map.Entry;
+import java.util.NavigableMap;
 
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
 import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
 import org.apache.hadoop.hive.serde2.lazy.LazyMap;
@@ -42,6 +43,7 @@ public class LazyHBaseCellMap extends La
 
   private Result result;
   private byte [] columnFamilyBytes;
+  private byte[] qualPrefix;
   private List<Boolean> binaryStorage;
 
   /**
@@ -54,12 +56,21 @@ public class LazyHBaseCellMap extends La
 
   public void init(
       Result r,
-      byte [] columnFamilyBytes,
+      byte[] columnFamilyBytes,
       List<Boolean> binaryStorage) {
 
+    init(r, columnFamilyBytes, binaryStorage, null);
+  }
+
+  public void init(
+      Result r,
+      byte [] columnFamilyBytes,
+      List<Boolean> binaryStorage, byte[] qualPrefix) {
+
     result = r;
     this.columnFamilyBytes = columnFamilyBytes;
     this.binaryStorage = binaryStorage;
+    this.qualPrefix = qualPrefix;
     setParsed(false);
   }
 
@@ -80,6 +91,12 @@ public class LazyHBaseCellMap extends La
           continue;
         }
 
+        if (qualPrefix != null && !Bytes.startsWith(e.getKey(), qualPrefix)) {
+          // since we were provided a qualifier prefix, only accept qualifiers that start with this
+          // prefix
+          continue;
+        }
+
         LazyMapObjectInspector lazyMoi = getInspector();
 
         // Keys are always primitive

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=1506563&r1=1506562&r2=1506563&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java Wed Jul 24 14:04:24 2013
@@ -142,9 +142,11 @@ public class LazyHBaseRow extends LazySt
       } else {
         if (colMap.qualifierName == null) {
           // it is a column family
-          // primitive type for Map<Key, Value> can be stored in binary format
+          // primitive type for Map<Key, Value> can be stored in binary format. Pass in the
+          // qualifier prefix to cherry pick the qualifiers that match the prefix instead of picking
+          // up everything
           ((LazyHBaseCellMap) fields[fieldID]).init(
-              result, colMap.familyNameBytes, colMap.binaryStorage);
+              result, colMap.familyNameBytes, colMap.binaryStorage, colMap.qualifierPrefixBytes);
         } else {
           // it is a column i.e. a column-family with column-qualifier
           byte [] res = result.getValue(colMap.familyNameBytes, colMap.qualifierNameBytes);

Modified: hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=1506563&r1=1506562&r2=1506563&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Wed Jul 24 14:04:24 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.hbase;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -688,4 +690,123 @@ public class TestHBaseSerDe extends Test
     Put serializedPut = (Put) hbaseSerDe.serialize(row, soi);
     assertEquals("Serialized data: ", p.toString(), serializedPut.toString());
   }
+
+  public void testHBaseSerDeWithColumnPrefixes()
+      throws Exception {
+    byte[] cfa = "cola".getBytes();
+
+    byte[] qualA = "prefixA_col1".getBytes();
+    byte[] qualB = "prefixB_col2".getBytes();
+    byte[] qualC = "prefixB_col3".getBytes();
+    byte[] qualD = "unwanted_col".getBytes();
+
+    List<Object> qualifiers = new ArrayList<Object>();
+    qualifiers.add(new Text("prefixA_col1"));
+    qualifiers.add(new Text("prefixB_col2"));
+    qualifiers.add(new Text("prefixB_col3"));
+    qualifiers.add(new Text("unwanted_col"));
+
+    List<Object> expectedQualifiers = new ArrayList<Object>();
+    expectedQualifiers.add(new Text("prefixA_col1"));
+    expectedQualifiers.add(new Text("prefixB_col2"));
+    expectedQualifiers.add(new Text("prefixB_col3"));
+
+    byte[] rowKey = Bytes.toBytes("test-row1");
+
+    // Data
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+    byte[] dataA = "This is first test data".getBytes();
+    byte[] dataB = "This is second test data".getBytes();
+    byte[] dataC = "This is third test data".getBytes();
+    byte[] dataD = "Unwanted data".getBytes();
+
+    kvs.add(new KeyValue(rowKey, cfa, qualA, dataA));
+    kvs.add(new KeyValue(rowKey, cfa, qualB, dataB));
+    kvs.add(new KeyValue(rowKey, cfa, qualC, dataC));
+    kvs.add(new KeyValue(rowKey, cfa, qualD, dataD));
+
+    Result r = new Result(kvs);
+
+    Put p = new Put(rowKey);
+
+    p.add(new KeyValue(rowKey, cfa, qualA, dataA));
+    p.add(new KeyValue(rowKey, cfa, qualB, dataB));
+    p.add(new KeyValue(rowKey, cfa, qualC, dataC));
+
+    Object[] expectedFieldsData = {
+        new Text("test-row1"),
+        new String("This is first test data"),
+        new String("This is second test data"),
+        new String("This is third test data")};
+
+    int[] expectedMapSize = new int[] {1, 2};
+
+    // Create, initialize, and test the SerDe
+    HBaseSerDe serDe = new HBaseSerDe();
+    Configuration conf = new Configuration();
+    Properties tbl = createPropertiesForColumnPrefixes();
+    serDe.initialize(conf, tbl);
+
+    Object notPresentKey = new Text("unwanted_col");
+
+    deserializeAndSerializeHivePrefixColumnFamily(serDe, r, p, expectedFieldsData, expectedMapSize,
+        expectedQualifiers,
+        notPresentKey);
+  }
+
+  private Properties createPropertiesForColumnPrefixes() {
+    Properties tbl = new Properties();
+    tbl.setProperty(serdeConstants.LIST_COLUMNS,
+        "key,astring,along");
+    tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES,
+        "string:map<string,string>:map<string,string>");
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+        ":key,cola:prefixA_.*,cola:prefixB_.*");
+
+    return tbl;
+  }
+
+  private void deserializeAndSerializeHivePrefixColumnFamily(HBaseSerDe serDe, Result r, Put p,
+      Object[] expectedFieldsData, int[] expectedMapSize, List<Object> expectedQualifiers,
+      Object notPresentKey)
+      throws SerDeException, IOException {
+    StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector();
+
+    List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
+
+    Object row = serDe.deserialize(r);
+
+    int j = 0;
+
+    for (int i = 0; i < fieldRefs.size(); i++) {
+      Object fieldData = soi.getStructFieldData(row, fieldRefs.get(i));
+      assertNotNull(fieldData);
+
+      if (fieldData instanceof LazyPrimitive<?, ?>) {
+        assertEquals(expectedFieldsData[i], ((LazyPrimitive<?, ?>) fieldData).getWritableObject());
+      } else if (fieldData instanceof LazyHBaseCellMap) {
+        assertEquals(expectedFieldsData[i], ((LazyHBaseCellMap) fieldData)
+            .getMapValueElement(expectedQualifiers.get(j)).toString().trim());
+
+        assertEquals(expectedMapSize[j], ((LazyHBaseCellMap) fieldData).getMapSize());
+        // Make sure that the unwanted key is not present in the map
+        assertNull(((LazyHBaseCellMap) fieldData).getMapValueElement(notPresentKey));
+
+        j++;
+
+      } else {
+        fail("Error: field data not an instance of LazyPrimitive<?, ?> or LazyHBaseCellMap");
+      }
+    }
+
+    SerDeUtils.getJSONString(row, soi);
+
+    // Now serialize
+    Put put = (Put) serDe.serialize(row, soi);
+
+    if (p != null) {
+      assertEquals("Serialized put:", p.toString(), put.toString());
+    }
+  }
 }