You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2013/07/24 16:04:24 UTC
svn commit: r1506563 - in /hive/trunk/hbase-handler/src:
java/org/apache/hadoop/hive/hbase/ test/org/apache/hadoop/hive/hbase/
Author: brock
Date: Wed Jul 24 14:04:24 2013
New Revision: 1506563
URL: http://svn.apache.org/r1506563
Log:
HIVE-3725: Add support for pulling HBase columns with prefixes (Swarnim Kulkarni via Brock Noland)
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=1506563&r1=1506562&r2=1506563&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Wed Jul 24 14:04:24 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ByteStream;
-import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -69,11 +68,16 @@ public class HBaseSerDe extends Abstract
public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";
+
+ /** Determines whether a regex matching should be done on the columns or not. Defaults to true.
+ * <strong>WARNING: Note that currently this only supports the suffix wildcard .*</strong> **/
+ public static final String HBASE_COLUMNS_REGEX_MATCHING = "hbase.columns.mapping.regex.matching";
public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
private ObjectInspector cachedObjectInspector;
private String hbaseColumnsMapping;
+ private boolean doColumnRegexMatching;
private List<ColumnMapping> columnsMapping;
private SerDeParameters serdeParams;
private boolean useJSONSerialize;
@@ -148,6 +152,21 @@ public class HBaseSerDe extends Abstract
*/
public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec)
throws SerDeException {
+ return parseColumnsMapping(columnsMappingSpec, true);
+ }
+
+ /**
+ * Parses the HBase columns mapping specifier to identify the column families, qualifiers
+ * and also caches the byte arrays corresponding to them. One of the Hive table
+ * columns maps to the HBase row key, by default the first column.
+ *
+ * @param columnsMappingSpec string hbase.columns.mapping specified when creating table
+ * @param doColumnRegexMatching whether to do a regex matching on the columns or not
+ * @return List<ColumnMapping> which contains the column mapping information by position
+ * @throws SerDeException
+ */
+ public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec, boolean doColumnRegexMatching)
+ throws SerDeException {
if (columnsMappingSpec == null) {
throw new SerDeException("Error: hbase.columns.mapping missing for this HBase table.");
@@ -193,8 +212,21 @@ public class HBaseSerDe extends Abstract
columnMapping.hbaseRowKey = false;
if (parts.length == 2) {
- columnMapping.qualifierName = parts[1];
- columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]);
+
+ if (doColumnRegexMatching && parts[1].endsWith(".*")) {
+ // we have a prefix with a wildcard
+ columnMapping.qualifierPrefix = parts[1].substring(0, parts[1].length() - 2);
+ columnMapping.qualifierPrefixBytes = Bytes.toBytes(columnMapping.qualifierPrefix);
+ // we weren't provided any actual qualifier name. Set these to
+ // null.
+ columnMapping.qualifierName = null;
+ columnMapping.qualifierNameBytes = null;
+ } else {
+ // set the regular provided qualifier names
+ columnMapping.qualifierName = parts[1];
+ columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]);
+ ;
+ }
} else {
columnMapping.qualifierName = null;
columnMapping.qualifierNameBytes = null;
@@ -413,6 +445,8 @@ public class HBaseSerDe extends Abstract
List<Boolean> binaryStorage;
boolean hbaseRowKey;
String mappingSpec;
+ String qualifierPrefix;
+ byte[] qualifierPrefixBytes;
}
private void initHBaseSerDeParameters(
@@ -424,8 +458,10 @@ public class HBaseSerDe extends Abstract
String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1"));
+ doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true"));
+
// Parse and initialize the HBase columns mapping
- columnsMapping = parseColumnsMapping(hbaseColumnsMapping);
+ columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
// Build the type property string if not supplied
if (columnTypeProperty == null) {
@@ -798,6 +834,7 @@ public class HBaseSerDe extends Abstract
return columnsMapping.get(colPos).binaryStorage;
}
+ @Override
public SerDeStats getSerDeStats() {
// no support for statistics
return null;
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1506563&r1=1506562&r2=1506563&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Wed Jul 24 14:04:24 2013
@@ -39,9 +39,9 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
@@ -279,6 +279,8 @@ public class HBaseStorageHandler extends
jobProperties.put(
HBaseSerDe.HBASE_COLUMNS_MAPPING,
tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING));
+ jobProperties.put(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING,
+ tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, "true"));
jobProperties.put(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE,
tableProperties.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE,"string"));
String scanCache = tableProperties.getProperty(HBaseSerDe.HBASE_SCAN_CACHE);
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=1506563&r1=1506562&r2=1506563&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Wed Jul 24 14:04:24 2013
@@ -91,11 +91,12 @@ public class HiveHBaseTableInputFormat e
String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)));
String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+ boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
List<ColumnMapping> columnsMapping = null;
try {
- columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+ columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
} catch (SerDeException e) {
throw new IOException(e);
}
@@ -434,6 +435,7 @@ public class HiveHBaseTableInputFormat e
String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)));
String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+ boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
if (hbaseColumnsMapping == null) {
throw new IOException("hbase.columns.mapping required for HBase Table.");
@@ -441,7 +443,7 @@ public class HiveHBaseTableInputFormat e
List<ColumnMapping> columnsMapping = null;
try {
- columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+ columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching);
} catch (SerDeException e) {
throw new IOException(e);
}
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java?rev=1506563&r1=1506562&r2=1506563&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java Wed Jul 24 14:04:24 2013
@@ -21,10 +21,11 @@ package org.apache.hadoop.hive.hbase;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.NavigableMap;
import java.util.Map.Entry;
+import java.util.NavigableMap;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyMap;
@@ -42,6 +43,7 @@ public class LazyHBaseCellMap extends La
private Result result;
private byte [] columnFamilyBytes;
+ private byte[] qualPrefix;
private List<Boolean> binaryStorage;
/**
@@ -54,12 +56,21 @@ public class LazyHBaseCellMap extends La
public void init(
Result r,
- byte [] columnFamilyBytes,
+ byte[] columnFamilyBytes,
List<Boolean> binaryStorage) {
+ init(r, columnFamilyBytes, binaryStorage, null);
+ }
+
+ public void init(
+ Result r,
+ byte [] columnFamilyBytes,
+ List<Boolean> binaryStorage, byte[] qualPrefix) {
+
result = r;
this.columnFamilyBytes = columnFamilyBytes;
this.binaryStorage = binaryStorage;
+ this.qualPrefix = qualPrefix;
setParsed(false);
}
@@ -80,6 +91,12 @@ public class LazyHBaseCellMap extends La
continue;
}
+ if (qualPrefix != null && !Bytes.startsWith(e.getKey(), qualPrefix)) {
+ // since we were provided a qualifier prefix, only accept qualifiers that start with this
+ // prefix
+ continue;
+ }
+
LazyMapObjectInspector lazyMoi = getInspector();
// Keys are always primitive
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=1506563&r1=1506562&r2=1506563&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java Wed Jul 24 14:04:24 2013
@@ -142,9 +142,11 @@ public class LazyHBaseRow extends LazySt
} else {
if (colMap.qualifierName == null) {
// it is a column family
- // primitive type for Map<Key, Value> can be stored in binary format
+ // primitive type for Map<Key, Value> can be stored in binary format. Pass in the
+ // qualifier prefix to cherry pick the qualifiers that match the prefix instead of picking
+ // up everything
((LazyHBaseCellMap) fields[fieldID]).init(
- result, colMap.familyNameBytes, colMap.binaryStorage);
+ result, colMap.familyNameBytes, colMap.binaryStorage, colMap.qualifierPrefixBytes);
} else {
// it is a column i.e. a column-family with column-qualifier
byte [] res = result.getValue(colMap.familyNameBytes, colMap.qualifierNameBytes);
Modified: hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=1506563&r1=1506562&r2=1506563&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Wed Jul 24 14:04:24 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.hbase;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -688,4 +690,123 @@ public class TestHBaseSerDe extends Test
Put serializedPut = (Put) hbaseSerDe.serialize(row, soi);
assertEquals("Serialized data: ", p.toString(), serializedPut.toString());
}
+
+ public void testHBaseSerDeWithColumnPrefixes()
+ throws Exception {
+ byte[] cfa = "cola".getBytes();
+
+ byte[] qualA = "prefixA_col1".getBytes();
+ byte[] qualB = "prefixB_col2".getBytes();
+ byte[] qualC = "prefixB_col3".getBytes();
+ byte[] qualD = "unwanted_col".getBytes();
+
+ List<Object> qualifiers = new ArrayList<Object>();
+ qualifiers.add(new Text("prefixA_col1"));
+ qualifiers.add(new Text("prefixB_col2"));
+ qualifiers.add(new Text("prefixB_col3"));
+ qualifiers.add(new Text("unwanted_col"));
+
+ List<Object> expectedQualifiers = new ArrayList<Object>();
+ expectedQualifiers.add(new Text("prefixA_col1"));
+ expectedQualifiers.add(new Text("prefixB_col2"));
+ expectedQualifiers.add(new Text("prefixB_col3"));
+
+ byte[] rowKey = Bytes.toBytes("test-row1");
+
+ // Data
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+ byte[] dataA = "This is first test data".getBytes();
+ byte[] dataB = "This is second test data".getBytes();
+ byte[] dataC = "This is third test data".getBytes();
+ byte[] dataD = "Unwanted data".getBytes();
+
+ kvs.add(new KeyValue(rowKey, cfa, qualA, dataA));
+ kvs.add(new KeyValue(rowKey, cfa, qualB, dataB));
+ kvs.add(new KeyValue(rowKey, cfa, qualC, dataC));
+ kvs.add(new KeyValue(rowKey, cfa, qualD, dataD));
+
+ Result r = new Result(kvs);
+
+ Put p = new Put(rowKey);
+
+ p.add(new KeyValue(rowKey, cfa, qualA, dataA));
+ p.add(new KeyValue(rowKey, cfa, qualB, dataB));
+ p.add(new KeyValue(rowKey, cfa, qualC, dataC));
+
+ Object[] expectedFieldsData = {
+ new Text("test-row1"),
+ new String("This is first test data"),
+ new String("This is second test data"),
+ new String("This is third test data")};
+
+ int[] expectedMapSize = new int[] {1, 2};
+
+ // Create, initialize, and test the SerDe
+ HBaseSerDe serDe = new HBaseSerDe();
+ Configuration conf = new Configuration();
+ Properties tbl = createPropertiesForColumnPrefixes();
+ serDe.initialize(conf, tbl);
+
+ Object notPresentKey = new Text("unwanted_col");
+
+ deserializeAndSerializeHivePrefixColumnFamily(serDe, r, p, expectedFieldsData, expectedMapSize,
+ expectedQualifiers,
+ notPresentKey);
+ }
+
+ private Properties createPropertiesForColumnPrefixes() {
+ Properties tbl = new Properties();
+ tbl.setProperty(serdeConstants.LIST_COLUMNS,
+ "key,astring,along");
+ tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES,
+ "string:map<string,string>:map<string,string>");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key,cola:prefixA_.*,cola:prefixB_.*");
+
+ return tbl;
+ }
+
+ private void deserializeAndSerializeHivePrefixColumnFamily(HBaseSerDe serDe, Result r, Put p,
+ Object[] expectedFieldsData, int[] expectedMapSize, List<Object> expectedQualifiers,
+ Object notPresentKey)
+ throws SerDeException, IOException {
+ StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector();
+
+ List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
+
+ Object row = serDe.deserialize(r);
+
+ int j = 0;
+
+ for (int i = 0; i < fieldRefs.size(); i++) {
+ Object fieldData = soi.getStructFieldData(row, fieldRefs.get(i));
+ assertNotNull(fieldData);
+
+ if (fieldData instanceof LazyPrimitive<?, ?>) {
+ assertEquals(expectedFieldsData[i], ((LazyPrimitive<?, ?>) fieldData).getWritableObject());
+ } else if (fieldData instanceof LazyHBaseCellMap) {
+ assertEquals(expectedFieldsData[i], ((LazyHBaseCellMap) fieldData)
+ .getMapValueElement(expectedQualifiers.get(j)).toString().trim());
+
+ assertEquals(expectedMapSize[j], ((LazyHBaseCellMap) fieldData).getMapSize());
+ // Make sure that the unwanted key is not present in the map
+ assertNull(((LazyHBaseCellMap) fieldData).getMapValueElement(notPresentKey));
+
+ j++;
+
+ } else {
+ fail("Error: field data not an instance of LazyPrimitive<?, ?> or LazyHBaseCellMap");
+ }
+ }
+
+ SerDeUtils.getJSONString(row, soi);
+
+ // Now serialize
+ Put put = (Put) serDe.serialize(row, soi);
+
+ if (p != null) {
+ assertEquals("Serialized put:", p.toString(), put.toString());
+ }
+ }
}