You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2012/03/09 01:47:38 UTC
svn commit: r1298673 [1/4] - in /hive/trunk:
hbase-handler/src/java/org/apache/hadoop/hive/hbase/
hbase-handler/src/test/org/apache/hadoop/hive/hbase/
hbase-handler/src/test/queries/ hbase-handler/src/test/results/
serde/src/java/org/apache/hadoop/hive...
Author: hashutosh
Date: Fri Mar 9 00:47:37 2012
New Revision: 1298673
URL: http://svn.apache.org/viewvc?rev=1298673&view=rev
Log:
HIVE-1634: Allow access to Primitive types stored in binary format in HBase (Basab Maulik, Ashutosh Chauhan via hashutosh)
Added:
hive/trunk/hbase-handler/src/test/queries/hbase_binary_external_table_queries.q
hive/trunk/hbase-handler/src/test/queries/hbase_binary_map_queries.q
hive/trunk/hbase-handler/src/test/queries/hbase_binary_storage_queries.q
hive/trunk/hbase-handler/src/test/results/hbase_binary_external_table_queries.q.out
hive/trunk/hbase-handler/src/test/results/hbase_binary_map_queries.q.out
hive/trunk/hbase-handler/src/test/results/hbase_binary_storage_queries.q.out
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioBoolean.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioByte.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioDouble.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioFloat.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioInteger.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioLong.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioShort.java
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsPublisher.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyFactory.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyPrimitive.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Fri Mar 9 00:47:37 2012
@@ -62,16 +62,14 @@ public class HBaseSerDe implements SerDe
public static final String HBASE_COLUMNS_MAPPING = "hbase.columns.mapping";
public static final String HBASE_TABLE_NAME = "hbase.table.name";
+ public static final String HBASE_TABLE_DEFAULT_STORAGE_TYPE = "hbase.table.default.storage.type";
public static final String HBASE_KEY_COL = ":key";
public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp";
public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
private ObjectInspector cachedObjectInspector;
private String hbaseColumnsMapping;
- private List<String> hbaseColumnFamilies;
- private List<byte []> hbaseColumnFamiliesBytes;
- private List<String> hbaseColumnQualifiers;
- private List<byte []> hbaseColumnQualifiersBytes;
+ private List<ColumnMapping> columnsMapping;
private SerDeParameters serdeParams;
private boolean useJSONSerialize;
private LazyHBaseRow cachedHBaseRow;
@@ -110,8 +108,7 @@ public class HBaseSerDe implements SerDe
public void initialize(Configuration conf, Properties tbl)
throws SerDeException {
- initHBaseSerDeParameters(conf, tbl,
- getClass().getName());
+ initHBaseSerDeParameters(conf, tbl, getClass().getName());
cachedObjectInspector = LazyFactory.createLazyStructInspector(
serdeParams.getColumnNames(),
@@ -136,118 +133,283 @@ public class HBaseSerDe implements SerDe
}
/**
- * Parses the HBase columns mapping to identify the column families, qualifiers
+ * Parses the HBase columns mapping specifier to identify the column families, qualifiers
* and also caches the byte arrays corresponding to them. One of the Hive table
* columns maps to the HBase row key, by default the first column.
*
- * @param columnMapping - the column mapping specification to be parsed
- * @param colFamilies - the list of HBase column family names
- * @param colFamiliesBytes - the corresponding byte array
- * @param colQualifiers - the list of HBase column qualifier names
- * @param colQualifiersBytes - the corresponding byte array
- * @return the row key index in the column names list
+ * @param columnsMappingSpec string hbase.columns.mapping specified when creating table
+ * @return List<ColumnMapping> which contains the column mapping information by position
* @throws SerDeException
*/
- public static int parseColumnMapping(
- String columnMapping,
- List<String> colFamilies,
- List<byte []> colFamiliesBytes,
- List<String> colQualifiers,
- List<byte []> colQualifiersBytes) throws SerDeException {
-
- int rowKeyIndex = -1;
-
- if (colFamilies == null || colQualifiers == null) {
- throw new SerDeException("Error: caller must pass in lists for the column families " +
- "and qualifiers.");
- }
-
- colFamilies.clear();
- colQualifiers.clear();
+ public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec)
+ throws SerDeException {
- if (columnMapping == null) {
+ if (columnsMappingSpec == null) {
throw new SerDeException("Error: hbase.columns.mapping missing for this HBase table.");
}
- if (columnMapping.equals("") || columnMapping.equals(HBASE_KEY_COL)) {
+ if (columnsMappingSpec.equals("") || columnsMappingSpec.equals(HBASE_KEY_COL)) {
throw new SerDeException("Error: hbase.columns.mapping specifies only the HBase table"
+ " row key. A valid Hive-HBase table must specify at least one additional column.");
}
- String [] mapping = columnMapping.split(",");
+ int rowKeyIndex = -1;
+ List<ColumnMapping> columnsMapping = new ArrayList<ColumnMapping>();
+ String [] columnSpecs = columnsMappingSpec.split(",");
+ ColumnMapping columnMapping = null;
+
+ for (int i = 0; i < columnSpecs.length; i++) {
+ String mappingSpec = columnSpecs[i];
+ String [] mapInfo = mappingSpec.split("#");
+ String colInfo = mapInfo[0];
- for (int i = 0; i < mapping.length; i++) {
- String elem = mapping[i];
- int idxFirst = elem.indexOf(":");
- int idxLast = elem.lastIndexOf(":");
+ int idxFirst = colInfo.indexOf(":");
+ int idxLast = colInfo.lastIndexOf(":");
if (idxFirst < 0 || !(idxFirst == idxLast)) {
throw new SerDeException("Error: the HBase columns mapping contains a badly formed " +
"column family, column qualifier specification.");
}
- if (elem.equals(HBASE_KEY_COL)) {
+ columnMapping = new ColumnMapping();
+
+ if (colInfo.equals(HBASE_KEY_COL)) {
rowKeyIndex = i;
- colFamilies.add(elem);
- colQualifiers.add(null);
+ columnMapping.familyName = colInfo;
+ columnMapping.familyNameBytes = Bytes.toBytes(colInfo);
+ columnMapping.qualifierName = null;
+ columnMapping.qualifierNameBytes = null;
+ columnMapping.hbaseRowKey = true;
} else {
- String [] parts = elem.split(":");
+ String [] parts = colInfo.split(":");
assert(parts.length > 0 && parts.length <= 2);
- colFamilies.add(parts[0]);
+ columnMapping.familyName = parts[0];
+ columnMapping.familyNameBytes = Bytes.toBytes(parts[0]);
+ columnMapping.hbaseRowKey = false;
if (parts.length == 2) {
- colQualifiers.add(parts[1]);
+ columnMapping.qualifierName = parts[1];
+ columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]);
} else {
- colQualifiers.add(null);
+ columnMapping.qualifierName = null;
+ columnMapping.qualifierNameBytes = null;
}
}
+
+ columnMapping.mappingSpec = mappingSpec;
+
+ columnsMapping.add(columnMapping);
}
if (rowKeyIndex == -1) {
- colFamilies.add(0, HBASE_KEY_COL);
- colQualifiers.add(0, null);
- rowKeyIndex = 0;
- }
+ columnMapping = new ColumnMapping();
+ columnMapping.familyName = HBASE_KEY_COL;
+ columnMapping.familyNameBytes = Bytes.toBytes(HBASE_KEY_COL);
+ columnMapping.qualifierName = null;
+ columnMapping.qualifierNameBytes = null;
+ columnMapping.hbaseRowKey = true;
+ columnMapping.mappingSpec = HBASE_KEY_COL;
+ columnsMapping.add(0, columnMapping);
+ }
+
+ return columnsMapping;
+ }
+
+ /*
+ * Utility method for parsing a string of the form '-,b,s,-,s:b,...' as a means of specifying
+ * whether to use a binary or an UTF string format to serialize and de-serialize primitive
+ * data types like boolean, byte, short, int, long, float, and double. This applies to
+ * regular columns and also to map column types which are associated with an HBase column
+ * family. For the map types, we apply the specification to the key or the value provided it
+ * is one of the above primitive types. The specifier is a colon separated value of the form
+ * -:s, or b:b where we have 's', 'b', or '-' on either side of the colon. 's' is for string
+ * format storage, 'b' is for native fixed width byte oriented storage, and '-' uses the
+ * table level default.
+ *
+ * @param hbaseTableDefaultStorageType - the specification associated with the table property
+ * hbase.table.default.storage.type
+ * @throws SerDeException on parse error.
+ */
- if (colFamilies.size() != colQualifiers.size()) {
- throw new SerDeException("Error in parsing the hbase columns mapping.");
- }
+ private void parseColumnStorageTypes(String hbaseTableDefaultStorageType)
+ throws SerDeException {
- // populate the corresponding byte [] if the client has passed in a non-null list
- if (colFamiliesBytes != null) {
- colFamiliesBytes.clear();
+ boolean tableBinaryStorage = false;
- for (String fam : colFamilies) {
- colFamiliesBytes.add(Bytes.toBytes(fam));
+ if (hbaseTableDefaultStorageType != null && !"".equals(hbaseTableDefaultStorageType)) {
+ if (hbaseTableDefaultStorageType.equals("binary")) {
+ tableBinaryStorage = true;
+ } else if (!hbaseTableDefaultStorageType.equals("string")) {
+ throw new SerDeException("Error: " + HBASE_TABLE_DEFAULT_STORAGE_TYPE +
+ " parameter must be specified as" +
+ " 'string' or 'binary'; '" + hbaseTableDefaultStorageType +
+ "' is not a valid specification for this table/serde property.");
}
}
- if (colQualifiersBytes != null) {
- colQualifiersBytes.clear();
+ // parse the string to determine column level storage type for primitive types
+ // 's' is for variable length string format storage
+ // 'b' is for fixed width binary storage of bytes
+ // '-' is for table storage type, which defaults to UTF8 string
+ // string data is always stored in the default escaped storage format; the data types
+ // byte, short, int, long, float, and double have a binary byte oriented storage option
+ List<TypeInfo> columnTypes = serdeParams.getColumnTypes();
+
+ for (int i = 0; i < columnsMapping.size(); i++) {
+
+ ColumnMapping colMap = columnsMapping.get(i);
+ TypeInfo colType = columnTypes.get(i);
+ String mappingSpec = colMap.mappingSpec;
+ String [] mapInfo = mappingSpec.split("#");
+ String [] storageInfo = null;
+
+ if (mapInfo.length == 2) {
+ storageInfo = mapInfo[1].split(":");
+ }
+
+ if (storageInfo == null) {
+
+ // use the table default storage specification
+ if (colType.getCategory() == Category.PRIMITIVE) {
+ if (!colType.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else if (colType.getCategory() == Category.MAP) {
+ TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
+ TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
+
+ if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
+ !keyTypeInfo.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
- for (String qual : colQualifiers) {
- if (qual == null) {
- colQualifiersBytes.add(null);
+ if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
+ !valueTypeInfo.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
} else {
- colQualifiersBytes.add(Bytes.toBytes(qual));
+ colMap.binaryStorage.add(false);
+ }
+
+ } else if (storageInfo.length == 1) {
+ // we have a storage specification for a primitive column type
+ String storageOption = storageInfo[0];
+
+ if ((colType.getCategory() == Category.MAP) ||
+ !(storageOption.equals("-") || "string".startsWith(storageOption) ||
+ "binary".startsWith(storageOption))) {
+ throw new SerDeException("Error: A column storage specification is one of the following:"
+ + " '-', a prefix of 'string', or a prefix of 'binary'. "
+ + storageOption + " is not a valid storage option specification for "
+ + serdeParams.getColumnNames().get(i));
+ }
+
+ if (colType.getCategory() == Category.PRIMITIVE &&
+ !colType.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
+
+ if ("-".equals(storageOption)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else if ("binary".startsWith(storageOption)) {
+ colMap.binaryStorage.add(true);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ } else if (storageInfo.length == 2) {
+ // we have a storage specification for a map column type
+
+ String keyStorage = storageInfo[0];
+ String valStorage = storageInfo[1];
+
+ if ((colType.getCategory() != Category.MAP) ||
+ !(keyStorage.equals("-") || "string".startsWith(keyStorage) ||
+ "binary".startsWith(keyStorage)) ||
+ !(valStorage.equals("-") || "string".startsWith(valStorage) ||
+ "binary".startsWith(valStorage))) {
+ throw new SerDeException("Error: To specify a valid column storage type for a Map"
+ + " column, use any two specifiers from '-', a prefix of 'string', "
+ + " and a prefix of 'binary' separated by a ':'."
+ + " Valid examples are '-:-', 's:b', etc. They specify the storage type for the"
+ + " key and value parts of the Map<?,?> respectively."
+ + " Invalid storage specification for column "
+ + serdeParams.getColumnNames().get(i)
+ + "; " + storageInfo[0] + ":" + storageInfo[1]);
+ }
+
+ TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
+ TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
+
+ if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
+ !keyTypeInfo.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
+
+ if (keyStorage.equals("-")) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else if ("binary".startsWith(keyStorage)) {
+ colMap.binaryStorage.add(true);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
+ !valueTypeInfo.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
+ if (valStorage.equals("-")) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else if ("binary".startsWith(valStorage)) {
+ colMap.binaryStorage.add(true);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ if (colMap.binaryStorage.size() != 2) {
+ throw new SerDeException("Error: In parsing the storage specification for column "
+ + serdeParams.getColumnNames().get(i));
}
- }
- }
- if (colFamiliesBytes != null && colQualifiersBytes != null) {
- if (colFamiliesBytes.size() != colQualifiersBytes.size()) {
- throw new SerDeException("Error in caching the bytes for the hbase column families " +
- "and qualifiers.");
+ } else {
+ // error in storage specification
+ throw new SerDeException("Error: " + HBASE_COLUMNS_MAPPING + " storage specification "
+ + mappingSpec + " is not valid for column: "
+ + serdeParams.getColumnNames().get(i));
}
}
-
- return rowKeyIndex;
}
- public static boolean isSpecialColumn(String hbaseColumnName) {
+
+ public static boolean isRowKeyColumn(String hbaseColumnName) {
return hbaseColumnName.equals(HBASE_KEY_COL);
}
+
+ static class ColumnMapping {
+
+ ColumnMapping() {
+ binaryStorage = new ArrayList<Boolean>(2);
+ }
+
+ String familyName;
+ String qualifierName;
+ byte [] familyNameBytes;
+ byte [] qualifierNameBytes;
+ List<Boolean> binaryStorage;
+ boolean hbaseRowKey;
+ String mappingSpec;
+ }
+
private void initHBaseSerDeParameters(
Configuration job, Properties tbl, String serdeName)
throws SerDeException {
@@ -257,33 +419,27 @@ public class HBaseSerDe implements SerDe
String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1"));
- // Parse the HBase columns mapping and initialize the col family & qualifiers
- hbaseColumnFamilies = new ArrayList<String>();
- hbaseColumnFamiliesBytes = new ArrayList<byte []>();
- hbaseColumnQualifiers = new ArrayList<String>();
- hbaseColumnQualifiersBytes = new ArrayList<byte []>();
- iKey = parseColumnMapping(hbaseColumnsMapping, hbaseColumnFamilies,
- hbaseColumnFamiliesBytes, hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
+ // Parse and initialize the HBase columns mapping
+ columnsMapping = parseColumnsMapping(hbaseColumnsMapping);
// Build the type property string if not supplied
if (columnTypeProperty == null) {
StringBuilder sb = new StringBuilder();
- for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
+ for (int i = 0; i < columnsMapping.size(); i++) {
if (sb.length() > 0) {
sb.append(":");
}
- String colFamily = hbaseColumnFamilies.get(i);
- String colQualifier = hbaseColumnQualifiers.get(i);
- if (isSpecialColumn(colFamily)) {
- // the row key column becomes a STRING
- sb.append(Constants.STRING_TYPE_NAME);
- } else if (colQualifier == null) {
+
+ ColumnMapping colMap = columnsMapping.get(i);
+
+ if (colMap.hbaseRowKey) {
+ // the row key column becomes a STRING
+ sb.append(Constants.STRING_TYPE_NAME);
+ } else if (colMap.qualifierName == null) {
// a column family become a MAP
- sb.append(
- Constants.MAP_TYPE_NAME + "<"
- + Constants.STRING_TYPE_NAME
- + "," + Constants.STRING_TYPE_NAME + ">");
+ sb.append(Constants.MAP_TYPE_NAME + "<" + Constants.STRING_TYPE_NAME + ","
+ + Constants.STRING_TYPE_NAME + ">");
} else {
// an individual column becomes a STRING
sb.append(Constants.STRING_TYPE_NAME);
@@ -294,11 +450,11 @@ public class HBaseSerDe implements SerDe
serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
- if (hbaseColumnFamilies.size() != serdeParams.getColumnNames().size()) {
+ if (columnsMapping.size() != serdeParams.getColumnNames().size()) {
throw new SerDeException(serdeName + ": columns has " +
serdeParams.getColumnNames().size() +
" elements while hbase.columns.mapping has " +
- hbaseColumnFamilies.size() + " elements" +
+ columnsMapping.size() + " elements" +
" (counting the key if implicit)");
}
@@ -308,24 +464,29 @@ public class HBaseSerDe implements SerDe
needsEscape = serdeParams.getNeedsEscape();
// check that the mapping schema is right;
- // check that the "column-family:" is mapped to MAP<String,?>
- for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
- String colFamily = hbaseColumnFamilies.get(i);
- String colQualifier = hbaseColumnQualifiers.get(i);
- if (colQualifier == null && !isSpecialColumn(colFamily)) {
+ // check that the "column-family:" is mapped to Map<key,?>
+ // where key extends LazyPrimitive<?, ?> and thus has type Category.PRIMITIVE
+ for (int i = 0; i < columnsMapping.size(); i++) {
+ ColumnMapping colMap = columnsMapping.get(i);
+ if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
TypeInfo typeInfo = serdeParams.getColumnTypes().get(i);
if ((typeInfo.getCategory() != Category.MAP) ||
- (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getTypeName()
- != Constants.STRING_TYPE_NAME)) {
+ (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory()
+ != Category.PRIMITIVE)) {
throw new SerDeException(
- serdeName + ": hbase column family '"
- + colFamily
- + "' should be mapped to Map<String,?> but is mapped to "
+ serdeName + ": hbase column family '" + colMap.familyName
+ + "' should be mapped to Map<? extends LazyPrimitive<?, ?>,?>, that is "
+ + "the Key for the map should be of primitive type, but is mapped to "
+ typeInfo.getTypeName());
}
}
}
+
+ // Precondition: make sure this is done after the rest of the SerDe initialization is done.
+ String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
+ parseColumnStorageTypes(hbaseTableStorageType);
+ setKeyColumnOffset();
}
/**
@@ -341,8 +502,7 @@ public class HBaseSerDe implements SerDe
throw new SerDeException(getClass().getName() + ": expects Result!");
}
- cachedHBaseRow.init((Result) result, hbaseColumnFamilies, hbaseColumnFamiliesBytes,
- hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
+ cachedHBaseRow.init((Result) result, columnsMapping);
return cachedHBaseRow;
}
@@ -413,9 +573,8 @@ public class HBaseSerDe implements SerDe
List<Object> list,
List<? extends StructField> declaredFields) throws IOException {
- // column name
- String hbaseColumnFamily = hbaseColumnFamilies.get(i);
- String hbaseColumnQualifier = hbaseColumnQualifiers.get(i);
+ // column mapping info
+ ColumnMapping colMap = columnsMapping.get(i);
// Get the field objectInspector and the field object.
ObjectInspector foi = fields.get(i).getFieldObjectInspector();
@@ -427,8 +586,8 @@ public class HBaseSerDe implements SerDe
}
// If the field corresponds to a column family in HBase
- if (hbaseColumnQualifier == null && !isSpecialColumn(hbaseColumnFamily)) {
- MapObjectInspector moi = (MapObjectInspector)foi;
+ if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
+ MapObjectInspector moi = (MapObjectInspector) foi;
ObjectInspector koi = moi.getMapKeyObjectInspector();
ObjectInspector voi = moi.getMapValueObjectInspector();
@@ -439,7 +598,12 @@ public class HBaseSerDe implements SerDe
for (Map.Entry<?, ?> entry: map.entrySet()) {
// Get the Key
serializeStream.reset();
- serialize(entry.getKey(), koi, 3);
+
+ // Map keys are required to be primitive and may be serialized in binary format
+ boolean isNotNull = serialize(entry.getKey(), koi, 3, colMap.binaryStorage.get(0));
+ if (!isNotNull) {
+ continue;
+ }
// Get the column-qualifier
byte [] columnQualifierBytes = new byte[serializeStream.getCount()];
@@ -448,13 +612,16 @@ public class HBaseSerDe implements SerDe
// Get the Value
serializeStream.reset();
- boolean isNotNull = serialize(entry.getValue(), voi, 3);
+
+ // Map values may be serialized in binary format when they are primitive and binary
+ // serialization is the option selected
+ isNotNull = serialize(entry.getValue(), voi, 3, colMap.binaryStorage.get(1));
if (!isNotNull) {
continue;
}
byte [] value = new byte[serializeStream.getCount()];
System.arraycopy(serializeStream.getData(), 0, value, 0, serializeStream.getCount());
- put.add(hbaseColumnFamiliesBytes.get(i), columnQualifierBytes, value);
+ put.add(colMap.familyNameBytes, columnQualifierBytes, value);
}
}
} else {
@@ -470,12 +637,15 @@ public class HBaseSerDe implements SerDe
declaredFields.get(i).getFieldObjectInspector().getCategory()
.equals(Category.PRIMITIVE) || useJSONSerialize)) {
+ // we always serialize the String type using the escaped algorithm for LazyString
isNotNull = serialize(
SerDeUtils.getJSONString(f, foi),
PrimitiveObjectInspectorFactory.javaStringObjectInspector,
- 1);
+ 1, false);
} else {
- isNotNull = serialize(f, foi, 1);
+ // use the serialization option switch to write primitive values as either a variable
+ // length UTF8 string or a fixed width bytes if serializing in binary format
+ isNotNull = serialize(f, foi, 1, colMap.binaryStorage.get(0));
}
if (!isNotNull) {
return null;
@@ -485,32 +655,49 @@ public class HBaseSerDe implements SerDe
if (i == iKey) {
return key;
}
- put.add(hbaseColumnFamiliesBytes.get(i), hbaseColumnQualifiersBytes.get(i), key);
+ put.add(colMap.familyNameBytes, colMap.qualifierNameBytes, key);
}
return null;
}
- /**
+ /*
* Serialize the row into a ByteStream.
*
* @param obj The object for the current field.
* @param objInspector The ObjectInspector for the current Object.
* @param level The current level of separator.
- * @throws IOException
- * @return true, if serialize is a not-null object; otherwise false.
+ * @param writeBinary Whether to write a primitive object as an UTF8 variable length string or
+ * as a fixed width byte array onto the byte stream.
+ * @throws IOException On error in writing to the serialization stream.
+ * @return true On serializing a non-null object, otherwise false.
*/
- private boolean serialize(Object obj, ObjectInspector objInspector, int level)
- throws IOException {
+ private boolean serialize(
+ Object obj,
+ ObjectInspector objInspector,
+ int level,
+ boolean writeBinary) throws IOException {
+
+ if (objInspector.getCategory() == Category.PRIMITIVE && writeBinary) {
+ LazyUtils.writePrimitive(serializeStream, obj, (PrimitiveObjectInspector) objInspector);
+ return true;
+ } else {
+ return serialize(obj, objInspector, level);
+ }
+ }
+
+ private boolean serialize(
+ Object obj,
+ ObjectInspector objInspector,
+ int level) throws IOException {
switch (objInspector.getCategory()) {
case PRIMITIVE: {
- LazyUtils.writePrimitiveUTF8(
- serializeStream, obj,
- (PrimitiveObjectInspector) objInspector,
- escaped, escapeChar, needsEscape);
+ LazyUtils.writePrimitiveUTF8(serializeStream, obj,
+ (PrimitiveObjectInspector) objInspector, escaped, escapeChar, needsEscape);
return true;
}
+
case LIST: {
char separator = (char) separators[level];
ListObjectInspector loi = (ListObjectInspector)objInspector;
@@ -528,6 +715,7 @@ public class HBaseSerDe implements SerDe
}
return true;
}
+
case MAP: {
char separator = (char) separators[level];
char keyValueSeparator = (char) separators[level+1];
@@ -553,6 +741,7 @@ public class HBaseSerDe implements SerDe
}
return true;
}
+
case STRUCT: {
char separator = (char)separators[level];
StructObjectInspector soi = (StructObjectInspector)objInspector;
@@ -565,7 +754,9 @@ public class HBaseSerDe implements SerDe
if (i > 0) {
serializeStream.write(separator);
}
- serialize(list.get(i), fields.get(i).getFieldObjectInspector(), level + 1);
+
+ serialize(list.get(i), fields.get(i).getFieldObjectInspector(),
+ level + 1);
}
}
return true;
@@ -601,4 +792,23 @@ public class HBaseSerDe implements SerDe
// no support for statistics
return null;
}
+
+ void setKeyColumnOffset() throws SerDeException {
+ iKey = getRowKeyColumnOffset(columnsMapping);
+ }
+
+ public static int getRowKeyColumnOffset(List<ColumnMapping> columnsMapping)
+ throws SerDeException {
+
+ for (int i = 0; i < columnsMapping.size(); i++) {
+ ColumnMapping colMap = columnsMapping.get(i);
+
+ if (colMap.hbaseRowKey && colMap.familyName.equals(HBASE_KEY_COL)) {
+ return i;
+ }
+ }
+
+ throw new SerDeException("HBaseSerDe Error: columns mapping list does not contain" +
+ " row key column.");
+ }
}
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java Fri Mar 9 00:47:37 2012
@@ -49,8 +49,8 @@ public class HBaseStatsAggregator implem
public boolean connect(Configuration hiveconf) {
try {
- HBaseConfiguration hbaseConf = new HBaseConfiguration(hiveconf);
- htable = new HTable(hbaseConf, HBaseStatsSetupConstants.PART_STAT_TABLE_NAME);
+ htable = new HTable(HBaseConfiguration.create(hiveconf),
+ HBaseStatsSetupConstants.PART_STAT_TABLE_NAME);
return true;
} catch (IOException e) {
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsPublisher.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsPublisher.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsPublisher.java Fri Mar 9 00:47:37 2012
@@ -49,8 +49,8 @@ public class HBaseStatsPublisher impleme
public boolean connect(Configuration hiveconf) {
try {
- HBaseConfiguration hbaseConf = new HBaseConfiguration(hiveconf);
- htable = new HTable(hbaseConf, HBaseStatsSetupConstants.PART_STAT_TABLE_NAME);
+ htable = new HTable(HBaseConfiguration.create(hiveconf),
+ HBaseStatsSetupConstants.PART_STAT_TABLE_NAME);
// for performance reason, defer update until the closeConnection
htable.setAutoFlush(false);
} catch (IOException e) {
@@ -130,8 +130,7 @@ public class HBaseStatsPublisher impleme
*/
public boolean init(Configuration hiveconf) {
try {
- HBaseConfiguration hbaseConf = new HBaseConfiguration(hiveconf);
- HBaseAdmin hbase = new HBaseAdmin(hbaseConf);
+ HBaseAdmin hbase = new HBaseAdmin(HBaseConfiguration.create(hiveconf));
// Creating table if not exists
if (!hbase.tableExists(HBaseStatsSetupConstants.PART_STAT_TABLE_NAME)) {
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Fri Mar 9 00:47:37 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.MasterNot
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Constants;
@@ -63,7 +64,7 @@ public class HBaseStorageHandler extends
final static public String DEFAULT_PREFIX = "default.";
- private HBaseConfiguration hbaseConf;
+ private Configuration hbaseConf;
private HBaseAdmin admin;
private HBaseAdmin getHBaseAdmin() throws MetaException {
@@ -137,17 +138,9 @@ public class HBaseStorageHandler extends
String tableName = getHBaseTableName(tbl);
Map<String, String> serdeParam = tbl.getSd().getSerdeInfo().getParameters();
String hbaseColumnsMapping = serdeParam.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+ List<ColumnMapping> columnsMapping = null;
- if (hbaseColumnsMapping == null) {
- throw new MetaException("No hbase.columns.mapping defined in Serde.");
- }
-
- List<String> hbaseColumnFamilies = new ArrayList<String>();
- List<String> hbaseColumnQualifiers = new ArrayList<String>();
- List<byte []> hbaseColumnFamiliesBytes = new ArrayList<byte []>();
- List<byte []> hbaseColumnQualifiersBytes = new ArrayList<byte []>();
- int iKey = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping, hbaseColumnFamilies,
- hbaseColumnFamiliesBytes, hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
+ columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
HTableDescriptor tableDesc;
@@ -156,8 +149,13 @@ public class HBaseStorageHandler extends
if (!isExternal) {
// Create the column descriptors
tableDesc = new HTableDescriptor(tableName);
- Set<String> uniqueColumnFamilies = new HashSet<String>(hbaseColumnFamilies);
- uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey));
+ Set<String> uniqueColumnFamilies = new HashSet<String>();
+
+ for (ColumnMapping colMap : columnsMapping) {
+ if (!colMap.hbaseRowKey) {
+ uniqueColumnFamilies.add(colMap.familyName);
+ }
+ }
for (String columnFamily : uniqueColumnFamilies) {
tableDesc.addFamily(new HColumnDescriptor(Bytes.toBytes(columnFamily)));
@@ -179,13 +177,15 @@ public class HBaseStorageHandler extends
// make sure the schema mapping is right
tableDesc = getHBaseAdmin().getTableDescriptor(Bytes.toBytes(tableName));
- for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
- if (i == iKey) {
+ for (int i = 0; i < columnsMapping.size(); i++) {
+ ColumnMapping colMap = columnsMapping.get(i);
+
+ if (colMap.hbaseRowKey) {
continue;
}
- if (!tableDesc.hasFamily(hbaseColumnFamiliesBytes.get(i))) {
- throw new MetaException("Column Family " + hbaseColumnFamilies.get(i)
+ if (!tableDesc.hasFamily(colMap.familyNameBytes)) {
+ throw new MetaException("Column Family " + colMap.familyName
+ " is not defined in hbase table " + tableName);
}
}
@@ -231,7 +231,7 @@ public class HBaseStorageHandler extends
@Override
public void setConf(Configuration conf) {
- hbaseConf = new HBaseConfiguration(conf);
+ hbaseConf = HBaseConfiguration.create(conf);
}
@Override
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Fri Mar 9 00:47:37 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.mapreduce
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
@@ -79,23 +80,18 @@ public class HiveHBaseTableInputFormat e
HBaseSplit hbaseSplit = (HBaseSplit) split;
TableSplit tableSplit = hbaseSplit.getSplit();
String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
- setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
+ setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)));
String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
- List<String> hbaseColumnFamilies = new ArrayList<String>();
- List<String> hbaseColumnQualifiers = new ArrayList<String>();
- List<byte []> hbaseColumnFamiliesBytes = new ArrayList<byte []>();
- List<byte []> hbaseColumnQualifiersBytes = new ArrayList<byte []>();
+ List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
+ List<ColumnMapping> columnsMapping = null;
- int iKey;
try {
- iKey = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping, hbaseColumnFamilies,
- hbaseColumnFamiliesBytes, hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
- } catch (SerDeException se) {
- throw new IOException(se);
+ columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+ } catch (SerDeException e) {
+ throw new IOException(e);
}
- List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
- if (hbaseColumnFamilies.size() < readColIDs.size()) {
+ if (columnsMapping.size() < readColIDs.size()) {
throw new IOException("Cannot read more columns than the given table contains.");
}
@@ -105,14 +101,15 @@ public class HiveHBaseTableInputFormat e
if (!addAll) {
for (int i : readColIDs) {
- if (i == iKey) {
+ ColumnMapping colMap = columnsMapping.get(i);
+ if (colMap.hbaseRowKey) {
continue;
}
- if (hbaseColumnQualifiers.get(i) == null) {
- scan.addFamily(hbaseColumnFamiliesBytes.get(i));
+ if (colMap.qualifierName == null) {
+ scan.addFamily(colMap.familyNameBytes);
} else {
- scan.addColumn(hbaseColumnFamiliesBytes.get(i), hbaseColumnQualifiersBytes.get(i));
+ scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
}
empty = false;
@@ -125,15 +122,16 @@ public class HiveHBaseTableInputFormat e
// to the HBase scan so that we can retrieve all of the row keys and return them as the Hive
// tables column projection.
if (empty) {
- for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
- if (i == iKey) {
+ for (int i = 0; i < columnsMapping.size(); i++) {
+ ColumnMapping colMap = columnsMapping.get(i);
+ if (colMap.hbaseRowKey) {
continue;
}
- if (hbaseColumnQualifiers.get(i) == null) {
- scan.addFamily(hbaseColumnFamiliesBytes.get(i));
+ if (colMap.qualifierName == null) {
+ scan.addFamily(colMap.familyNameBytes);
} else {
- scan.addColumn(hbaseColumnFamiliesBytes.get(i), hbaseColumnQualifiersBytes.get(i));
+ scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
}
if (!addAll) {
@@ -144,10 +142,16 @@ public class HiveHBaseTableInputFormat e
// If Hive's optimizer gave us a filter to process, convert it to the
// HBase scan form now.
- tableSplit = convertFilter(jobConf, scan, tableSplit, iKey);
+ int iKey = -1;
- setScan(scan);
+ try {
+ iKey = HBaseSerDe.getRowKeyColumnOffset(columnsMapping);
+ } catch (SerDeException e) {
+ throw new IOException(e);
+ }
+ tableSplit = convertFilter(jobConf, scan, tableSplit, iKey);
+ setScan(scan);
Job job = new Job(jobConf);
TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(
job.getConfiguration(), reporter);
@@ -363,50 +367,53 @@ public class HiveHBaseTableInputFormat e
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
- setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
+ setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)));
String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
if (hbaseColumnsMapping == null) {
throw new IOException("hbase.columns.mapping required for HBase Table.");
}
- List<String> hbaseColumnFamilies = new ArrayList<String>();
- List<String> hbaseColumnQualifiers = new ArrayList<String>();
- List<byte []> hbaseColumnFamiliesBytes = new ArrayList<byte []>();
- List<byte []> hbaseColumnQualifiersBytes = new ArrayList<byte []>();
+ List<ColumnMapping> columnsMapping = null;
+ try {
+ columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+ } catch (SerDeException e) {
+ throw new IOException(e);
+ }
int iKey;
+
try {
- iKey = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping, hbaseColumnFamilies,
- hbaseColumnFamiliesBytes, hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
- } catch (SerDeException se) {
- throw new IOException(se);
+ iKey = HBaseSerDe.getRowKeyColumnOffset(columnsMapping);
+ } catch (SerDeException e) {
+ throw new IOException(e);
}
Scan scan = new Scan();
- // Take filter pushdown into account while calculating splits; this
- // allows us to prune off regions immediately. Note that although
- // the Javadoc for the superclass getSplits says that it returns one
- // split per region, the implementation actually takes the scan
- // definition into account and excludes regions which don't satisfy
- // the start/stop row conditions (HBASE-1829).
- convertFilter(jobConf, scan, null, iKey);
-
// REVIEW: are we supposed to be applying the getReadColumnIDs
// same as in getRecordReader?
- for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
- if (i == iKey) {
+ for (int i = 0; i <columnsMapping.size(); i++) {
+ ColumnMapping colMap = columnsMapping.get(i);
+ if (colMap.hbaseRowKey) {
continue;
}
- if (hbaseColumnQualifiers.get(i) == null) {
- scan.addFamily(hbaseColumnFamiliesBytes.get(i));
+ if (colMap.qualifierName == null) {
+ scan.addFamily(colMap.familyNameBytes);
} else {
- scan.addColumn(hbaseColumnFamiliesBytes.get(i), hbaseColumnQualifiersBytes.get(i));
+ scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
}
}
+ // Take filter pushdown into account while calculating splits; this
+ // allows us to prune off regions immediately. Note that although
+ // the Javadoc for the superclass getSplits says that it returns one
+ // split per region, the implementation actually takes the scan
+ // definition into account and excludes regions which don't satisfy
+ // the start/stop row conditions (HBASE-1829).
+ convertFilter(jobConf, scan, null, iKey);
+
setScan(scan);
Job job = new Job(jobConf);
JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java Fri Mar 9 00:47:37 2012
@@ -78,7 +78,7 @@ public class HiveHBaseTableOutputFormat
jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
final boolean walEnabled = HiveConf.getBoolVar(
jc, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED);
- final HTable table = new HTable(new HBaseConfiguration(jc), hbaseTableName);
+ final HTable table = new HTable(HBaseConfiguration.create(jc), hbaseTableName);
table.setAutoFlush(false);
return new RecordWriter() {
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java Fri Mar 9 00:47:37 2012
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.hbase;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Map.Entry;
@@ -41,6 +42,7 @@ public class LazyHBaseCellMap extends La
private Result result;
private byte [] columnFamilyBytes;
+ private List<Boolean> binaryStorage;
/**
* Construct a LazyCellMap object with the ObjectInspector.
@@ -50,9 +52,14 @@ public class LazyHBaseCellMap extends La
super(oi);
}
- public void init(Result r, byte [] columnFamilyBytes) {
+ public void init(
+ Result r,
+ byte [] columnFamilyBytes,
+ List<Boolean> binaryStorage) {
+
result = r;
this.columnFamilyBytes = columnFamilyBytes;
+ this.binaryStorage = binaryStorage;
setParsed(false);
}
@@ -73,10 +80,13 @@ public class LazyHBaseCellMap extends La
continue;
}
+ LazyMapObjectInspector lazyMoi = getInspector();
+
// Keys are always primitive
LazyPrimitive<? extends ObjectInspector, ? extends Writable> key =
LazyFactory.createLazyPrimitiveClass(
- (PrimitiveObjectInspector) getInspector().getMapKeyObjectInspector());
+ (PrimitiveObjectInspector) lazyMoi.getMapKeyObjectInspector(),
+ binaryStorage.get(0));
ByteArrayRef keyRef = new ByteArrayRef();
keyRef.setData(e.getKey());
@@ -84,8 +94,8 @@ public class LazyHBaseCellMap extends La
// Value
LazyObject<?> value =
- LazyFactory.createLazyObject(
- getInspector().getMapValueObjectInspector());
+ LazyFactory.createLazyObject(lazyMoi.getMapValueObjectInspector(),
+ binaryStorage.get(1));
ByteArrayRef valueRef = new ByteArrayRef();
valueRef.setData(e.getValue());
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java Fri Mar 9 00:47:37 2012
@@ -23,12 +23,14 @@ import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObject;
import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -42,10 +44,7 @@ public class LazyHBaseRow extends LazySt
* The HBase columns mapping of the row.
*/
private Result result;
- private List<String> hbaseColumnFamilies;
- private List<byte []> hbaseColumnFamiliesBytes;
- private List<String> hbaseColumnQualifiers;
- private List<byte []> hbaseColumnQualifiersBytes;
+ private List<ColumnMapping> columnsMapping;
private ArrayList<Object> cachedList;
/**
@@ -59,18 +58,10 @@ public class LazyHBaseRow extends LazySt
* Set the HBase row data(a Result writable) for this LazyStruct.
* @see LazyHBaseRow#init(Result)
*/
- public void init(
- Result r,
- List<String> hbaseColumnFamilies,
- List<byte []> hbaseColumnFamiliesBytes,
- List<String> hbaseColumnQualifiers,
- List<byte []> hbaseColumnQualifiersBytes) {
+ public void init(Result r, List<ColumnMapping> columnsMapping) {
result = r;
- this.hbaseColumnFamilies = hbaseColumnFamilies;
- this.hbaseColumnFamiliesBytes = hbaseColumnFamiliesBytes;
- this.hbaseColumnQualifiers = hbaseColumnQualifiers;
- this.hbaseColumnQualifiersBytes = hbaseColumnQualifiersBytes;
+ this.columnsMapping = columnsMapping;
setParsed(false);
}
@@ -79,25 +70,31 @@ public class LazyHBaseRow extends LazySt
* @see LazyStruct#parse()
*/
private void parse() {
+
if (getFields() == null) {
List<? extends StructField> fieldRefs =
((StructObjectInspector)getInspector()).getAllStructFieldRefs();
- setFields(new LazyObject[fieldRefs.size()]);
- for (int i = 0; i < getFields().length; i++) {
- String hbaseColumnFamily = hbaseColumnFamilies.get(i);
- String hbaseColumnQualifier = hbaseColumnQualifiers.get(i);
+ LazyObject<? extends ObjectInspector> [] fields = new LazyObject<?>[fieldRefs.size()];
+
+ for (int i = 0; i < fields.length; i++) {
+ ColumnMapping colMap = columnsMapping.get(i);
- if (hbaseColumnQualifier == null && !HBaseSerDe.isSpecialColumn(hbaseColumnFamily)) {
+ if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
// a column family
- getFields()[i] = new LazyHBaseCellMap(
+ fields[i] = new LazyHBaseCellMap(
(LazyMapObjectInspector) fieldRefs.get(i).getFieldObjectInspector());
continue;
}
- getFields()[i] = LazyFactory.createLazyObject(fieldRefs.get(i).getFieldObjectInspector());
+ fields[i] = LazyFactory.createLazyObject(
+ fieldRefs.get(i).getFieldObjectInspector(),
+ colMap.binaryStorage.get(0));
}
- setFieldInited(new boolean[getFields().length]);
+
+ setFields(fields);
+ setFieldInited(new boolean[fields.length]);
}
+
Arrays.fill(getFieldInited(), false);
setParsed(true);
}
@@ -119,6 +116,7 @@ public class LazyHBaseRow extends LazySt
if (!getParsed()) {
parse();
}
+
return uncheckedGetField(fieldID);
}
@@ -130,25 +128,27 @@ public class LazyHBaseRow extends LazySt
* @return The value of the field
*/
private Object uncheckedGetField(int fieldID) {
- if (!getFieldInited()[fieldID]) {
- getFieldInited()[fieldID] = true;
+
+ LazyObject<?> [] fields = getFields();
+ boolean [] fieldsInited = getFieldInited();
+
+ if (!fieldsInited[fieldID]) {
+ fieldsInited[fieldID] = true;
ByteArrayRef ref = null;
- String columnFamily = hbaseColumnFamilies.get(fieldID);
- String columnQualifier = hbaseColumnQualifiers.get(fieldID);
- byte [] columnFamilyBytes = hbaseColumnFamiliesBytes.get(fieldID);
- byte [] columnQualifierBytes = hbaseColumnQualifiersBytes.get(fieldID);
+ ColumnMapping colMap = columnsMapping.get(fieldID);
- if (HBaseSerDe.isSpecialColumn(columnFamily)) {
- assert(columnQualifier == null);
+ if (colMap.hbaseRowKey) {
ref = new ByteArrayRef();
ref.setData(result.getRow());
} else {
- if (columnQualifier == null) {
+ if (colMap.qualifierName == null) {
// it is a column family
- ((LazyHBaseCellMap) getFields()[fieldID]).init(result, columnFamilyBytes);
+ // primitive type for Map<Key, Value> can be stored in binary format
+ ((LazyHBaseCellMap) fields[fieldID]).init(
+ result, colMap.familyNameBytes, colMap.binaryStorage);
} else {
// it is a column i.e. a column-family with column-qualifier
- byte [] res = result.getValue(columnFamilyBytes, columnQualifierBytes);
+ byte [] res = result.getValue(colMap.familyNameBytes, colMap.qualifierNameBytes);
if (res == null) {
return null;
@@ -160,11 +160,11 @@ public class LazyHBaseRow extends LazySt
}
if (ref != null) {
- getFields()[fieldID].init(ref, 0, ref.getData().length);
+ fields[fieldID].init(ref, 0, ref.getData().length);
}
}
- return getFields()[fieldID].getObject();
+ return fields[fieldID].getObject();
}
/**
Modified: hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java (original)
+++ hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java Fri Mar 9 00:47:37 2012
@@ -21,16 +21,23 @@ package org.apache.hadoop.hive.hbase;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
+import java.util.Arrays;
import junit.extensions.TestSetup;
import junit.framework.Test;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;
import org.apache.zookeeper.Watcher;
@@ -62,8 +69,7 @@ public class HBaseTestSetup extends Test
}
conf.set("hbase.rootdir", hbaseRoot);
conf.set("hbase.master", hbaseCluster.getHMasterAddress().toString());
- conf.set("hbase.zookeeper.property.clientPort",
- Integer.toString(zooKeeperPort));
+ conf.set("hbase.zookeeper.property.clientPort", Integer.toString(zooKeeperPort));
String auxJars = conf.getAuxJars();
auxJars = ((auxJars == null) ? "" : (auxJars + ",")) + "file://"
+ new JobConf(conf, HBaseConfiguration.class).getJar();
@@ -82,7 +88,7 @@ public class HBaseTestSetup extends Test
new File(tmpdir, "zookeeper"));
conf.set("hbase.zookeeper.property.clientPort",
Integer.toString(zooKeeperPort));
- HBaseConfiguration hbaseConf = new HBaseConfiguration(conf);
+ Configuration hbaseConf = HBaseConfiguration.create(conf);
hbaseConf.setInt("hbase.master.port", findFreePort());
hbaseConf.setInt("hbase.master.info.port", -1);
hbaseConf.setInt("hbase.regionserver.port", findFreePort());
@@ -90,7 +96,49 @@ public class HBaseTestSetup extends Test
hbaseCluster = new MiniHBaseCluster(hbaseConf, NUM_REGIONSERVERS);
conf.set("hbase.master", hbaseCluster.getHMasterAddress().toString());
// opening the META table ensures that cluster is running
- new HTable(new HBaseConfiguration(conf), HConstants.META_TABLE_NAME);
+ new HTable(hbaseConf, HConstants.META_TABLE_NAME);
+ createHBaseTable(hbaseConf);
+ }
+
+ private void createHBaseTable(Configuration hbaseConf) throws IOException {
+ final String HBASE_TABLE_NAME = "HiveExternalTable";
+ HTableDescriptor htableDesc = new HTableDescriptor(HBASE_TABLE_NAME.getBytes());
+ HColumnDescriptor hcolDesc = new HColumnDescriptor("cf".getBytes());
+ htableDesc.addFamily(hcolDesc);
+ HBaseAdmin hbaseAdmin = new HBaseAdmin(hbaseConf);
+ if(Arrays.asList(hbaseAdmin.listTables()).contains(htableDesc)){
+ // if table is already in there, don't recreate.
+ return;
+ }
+ hbaseAdmin.createTable(htableDesc);
+ HTable htable = new HTable(hbaseConf, HBASE_TABLE_NAME);
+
+ // data
+ Put [] puts = new Put [] {
+ new Put("key-1".getBytes()), new Put("key-2".getBytes()), new Put("key-3".getBytes()) };
+
+ boolean [] booleans = new boolean [] { true, false, true };
+ byte [] bytes = new byte [] { Byte.MIN_VALUE, -1, Byte.MAX_VALUE };
+ short [] shorts = new short [] { Short.MIN_VALUE, -1, Short.MAX_VALUE };
+ int [] ints = new int [] { Integer.MIN_VALUE, -1, Integer.MAX_VALUE };
+ long [] longs = new long [] { Long.MIN_VALUE, -1, Long.MAX_VALUE };
+ String [] strings = new String [] { "Hadoop, HBase,", "Hive", "Test Strings" };
+ float [] floats = new float [] { Float.MIN_VALUE, -1.0F, Float.MAX_VALUE };
+ double [] doubles = new double [] { Double.MIN_VALUE, -1.0, Double.MAX_VALUE };
+
+ // store data
+ for (int i = 0; i < puts.length; i++) {
+ puts[i].add("cf".getBytes(), "cq-boolean".getBytes(), Bytes.toBytes(booleans[i]));
+ puts[i].add("cf".getBytes(), "cq-byte".getBytes(), new byte [] { bytes[i] });
+ puts[i].add("cf".getBytes(), "cq-short".getBytes(), Bytes.toBytes(shorts[i]));
+ puts[i].add("cf".getBytes(), "cq-int".getBytes(), Bytes.toBytes(ints[i]));
+ puts[i].add("cf".getBytes(), "cq-long".getBytes(), Bytes.toBytes(longs[i]));
+ puts[i].add("cf".getBytes(), "cq-string".getBytes(), Bytes.toBytes(strings[i]));
+ puts[i].add("cf".getBytes(), "cq-float".getBytes(), Bytes.toBytes(floats[i]));
+ puts[i].add("cf".getBytes(), "cq-double".getBytes(), Bytes.toBytes(doubles[i]));
+
+ htable.put(puts[i]);
+ }
}
private static int findFreePort() throws IOException {
Modified: hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Fri Mar 9 00:47:37 2012
@@ -50,14 +50,9 @@ import org.apache.hadoop.io.Text;
public class TestHBaseSerDe extends TestCase {
/**
- * Test the LazySimpleSerDe class.
+ * Test the default behavior of the Lazy family of objects and object inspectors.
*/
- public void testHBaseSerDe() throws SerDeException {
- // Create the SerDe
- HBaseSerDe serDe = new HBaseSerDe();
- Configuration conf = new Configuration();
- Properties tbl = createProperties();
- serDe.initialize(conf, tbl);
+ public void testHBaseSerDeI() throws SerDeException {
byte [] cfa = "cola".getBytes();
byte [] cfb = "colb".getBytes();
@@ -112,6 +107,33 @@ public class TestHBaseSerDe extends Test
new BooleanWritable(true)
};
+ // Create, initialize, and test the SerDe
+ HBaseSerDe serDe = new HBaseSerDe();
+ Configuration conf = new Configuration();
+ Properties tbl = createPropertiesI_I();
+ serDe.initialize(conf, tbl);
+
+ deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+
+ serDe = new HBaseSerDe();
+ conf = new Configuration();
+ tbl = createPropertiesI_II();
+ serDe.initialize(conf, tbl);
+
+ deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+
+ serDe = new HBaseSerDe();
+ conf = new Configuration();
+ tbl = createPropertiesI_III();
+ serDe.initialize(conf, tbl);
+
+ deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+
+ serDe = new HBaseSerDe();
+ conf = new Configuration();
+ tbl = createPropertiesI_IV();
+ serDe.initialize(conf, tbl);
+
deserializeAndSerialize(serDe, r, p, expectedFieldsData);
}
@@ -119,7 +141,7 @@ public class TestHBaseSerDe extends Test
// Create the SerDe
HBaseSerDe serDe = new HBaseSerDe();
Configuration conf = new Configuration();
- Properties tbl = createProperties();
+ Properties tbl = createPropertiesI_I();
long putTimestamp = 1;
tbl.setProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,
Long.toString(putTimestamp));
@@ -207,7 +229,8 @@ public class TestHBaseSerDe extends Test
assertEquals("Serialized data", p.toString(), serializedPut.toString());
}
- private Properties createProperties() {
+ // No specifications default to UTF8 String storage for backwards compatibility
+ private Properties createPropertiesI_I() {
Properties tbl = new Properties();
// Set the configuration parameters
@@ -219,4 +242,450 @@ public class TestHBaseSerDe extends Test
"cola:byte,colb:short,colc:int,cola:long,colb:float,colc:double,cola:string,colb:boolean");
return tbl;
}
+
+ // Default column storage specification inherits from table level default
+ // (in this case a missing specification) of UTF String storage
+ private Properties createPropertiesI_II() {
+ Properties tbl = new Properties();
+
+ // Set the configuration parameters
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
+ tbl.setProperty("columns.types",
+ "string,tinyint:smallint:int:bigint:float:double:string:boolean");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key#-,cola:byte#s,colb:short#-,colc:int#s,cola:long#s,colb:float#-,colc:double#-," +
+ "cola:string#s,colb:boolean#s");
+ return tbl;
+ }
+
+ // String storage type overrides table level default of binary storage
+ private Properties createPropertiesI_III() {
+ Properties tbl = new Properties();
+
+ // Set the configuration parameters
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
+ tbl.setProperty("columns.types",
+ "string,tinyint:smallint:int:bigint:float:double:string:boolean");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key#s,cola:byte#s,colb:short#s,colc:int#s,cola:long#s,colb:float#s,colc:double#s," +
+ "cola:string#s,colb:boolean#s");
+ tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary");
+ return tbl;
+ }
+
+ // String type is never stored as anything other than an escaped string
+ // A specification of binary storage should not affect ser/de.
+ private Properties createPropertiesI_IV() {
+ Properties tbl = new Properties();
+
+ // Set the configuration parameters
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
+ tbl.setProperty("columns.types",
+ "string,tinyint:smallint:int:bigint:float:double:string:boolean");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key#-,cola:byte#s,colb:short#s,colc:int#s,cola:long#s,colb:float#s,colc:double#s," +
+ "cola:string#b,colb:boolean#s");
+ tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary");
+ return tbl;
+ }
+
+ public void testHBaseSerDeII() throws SerDeException {
+
+ byte [] cfa = "cfa".getBytes();
+ byte [] cfb = "cfb".getBytes();
+ byte [] cfc = "cfc".getBytes();
+
+ byte [] qualByte = "byte".getBytes();
+ byte [] qualShort = "short".getBytes();
+ byte [] qualInt = "int".getBytes();
+ byte [] qualLong = "long".getBytes();
+ byte [] qualFloat = "float".getBytes();
+ byte [] qualDouble = "double".getBytes();
+ byte [] qualString = "string".getBytes();
+ byte [] qualBool = "boolean".getBytes();
+
+ byte [] rowKey = Bytes.toBytes("test-row-2");
+
+ // Data
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+ kvs.add(new KeyValue(rowKey, cfa, qualByte, new byte [] { Byte.MIN_VALUE }));
+ kvs.add(new KeyValue(rowKey, cfb, qualShort, Bytes.toBytes(Short.MIN_VALUE)));
+ kvs.add(new KeyValue(rowKey, cfc, qualInt, Bytes.toBytes(Integer.MIN_VALUE)));
+ kvs.add(new KeyValue(rowKey, cfa, qualLong, Bytes.toBytes(Long.MIN_VALUE)));
+ kvs.add(new KeyValue(rowKey, cfb, qualFloat, Bytes.toBytes(Float.MIN_VALUE)));
+ kvs.add(new KeyValue(rowKey, cfc, qualDouble, Bytes.toBytes(Double.MAX_VALUE)));
+ kvs.add(new KeyValue(rowKey, cfa, qualString, Bytes.toBytes(
+ "Hadoop, HBase, and Hive Again!")));
+ kvs.add(new KeyValue(rowKey, cfb, qualBool, Bytes.toBytes(false)));
+
+ Collections.sort(kvs, KeyValue.COMPARATOR);
+ Result r = new Result(kvs);
+
+ Put p = new Put(rowKey);
+
+ p.add(cfa, qualByte, new byte [] { Byte.MIN_VALUE });
+ p.add(cfb, qualShort, Bytes.toBytes(Short.MIN_VALUE));
+ p.add(cfc, qualInt, Bytes.toBytes(Integer.MIN_VALUE));
+ p.add(cfa, qualLong, Bytes.toBytes(Long.MIN_VALUE));
+ p.add(cfb, qualFloat, Bytes.toBytes(Float.MIN_VALUE));
+ p.add(cfc, qualDouble, Bytes.toBytes(Double.MAX_VALUE));
+ p.add(cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive Again!"));
+ p.add(cfb, qualBool, Bytes.toBytes(false));
+
+ Object[] expectedFieldsData = {
+ new Text("test-row-2"),
+ new ByteWritable(Byte.MIN_VALUE),
+ new ShortWritable(Short.MIN_VALUE),
+ new IntWritable(Integer.MIN_VALUE),
+ new LongWritable(Long.MIN_VALUE),
+ new FloatWritable(Float.MIN_VALUE),
+ new DoubleWritable(Double.MAX_VALUE),
+ new Text("Hadoop, HBase, and Hive Again!"),
+ new BooleanWritable(false)
+ };
+
+ // Create, initialize, and test the SerDe
+ HBaseSerDe serDe = new HBaseSerDe();
+ Configuration conf = new Configuration();
+ Properties tbl = createPropertiesII_I();
+ serDe.initialize(conf, tbl);
+
+ deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+
+ serDe = new HBaseSerDe();
+ conf = new Configuration();
+ tbl = createPropertiesII_II();
+ serDe.initialize(conf, tbl);
+
+ deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+
+ serDe = new HBaseSerDe();
+ conf = new Configuration();
+ tbl = createPropertiesII_III();
+ serDe.initialize(conf, tbl);
+
+ deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+ }
+
+ private Properties createPropertiesII_I() {
+ Properties tbl = new Properties();
+
+ // Set the configuration parameters
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
+ tbl.setProperty("columns.types",
+ "string,tinyint:smallint:int:bigint:float:double:string:boolean");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key#-,cfa:byte#b,cfb:short#b,cfc:int#-,cfa:long#b,cfb:float#-,cfc:double#b," +
+ "cfa:string#b,cfb:boolean#-");
+ tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary");
+ return tbl;
+ }
+
+ private Properties createPropertiesII_II() {
+ Properties tbl = new Properties();
+
+ // Set the configuration parameters
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
+ tbl.setProperty("columns.types",
+ "string,tinyint:smallint:int:bigint:float:double:string:boolean");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key#b,cfa:byte#b,cfb:short#b,cfc:int#b,cfa:long#b,cfb:float#b,cfc:double#b," +
+ "cfa:string#b,cfb:boolean#b");
+ tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string");
+ return tbl;
+ }
+
+ private Properties createPropertiesII_III() {
+ Properties tbl = new Properties();
+
+ // Set the configuration parameters
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
+ tbl.setProperty("columns.types",
+ "string,tinyint:smallint:int:bigint:float:double:string:boolean");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key#-,cfa:byte#b,cfb:short#b,cfc:int#b,cfa:long#b,cfb:float#b,cfc:double#b," +
+ "cfa:string#-,cfb:boolean#b");
+ return tbl;
+ }
+
+ public void testHBaseSerDeWithHiveMapToHBaseColumnFamily() throws SerDeException {
+
+ byte [] cfint = "cf-int".getBytes();
+ byte [] cfbyte = "cf-byte".getBytes();
+ byte [] cfshort = "cf-short".getBytes();
+ byte [] cflong = "cf-long".getBytes();
+ byte [] cffloat = "cf-float".getBytes();
+ byte [] cfdouble = "cf-double".getBytes();
+ byte [] cfbool = "cf-bool".getBytes();
+
+ byte [][] columnFamilies =
+ new byte [][] {cfint, cfbyte, cfshort, cflong, cffloat, cfdouble, cfbool};
+
+ byte [][] rowKeys = new byte [][] {
+ Integer.toString(1).getBytes(),
+ Integer.toString(Integer.MIN_VALUE).getBytes(),
+ Integer.toString(Integer.MAX_VALUE).getBytes()
+ };
+
+ byte [][][] columnQualifiersAndValues = new byte [][][] {
+ {Bytes.toBytes(1), new byte [] {1}, Bytes.toBytes((short) 1),
+ Bytes.toBytes((long) 1), Bytes.toBytes((float) 1.0F), Bytes.toBytes(1.0),
+ Bytes.toBytes(true)},
+ {Bytes.toBytes(Integer.MIN_VALUE), new byte [] {Byte.MIN_VALUE},
+ Bytes.toBytes((short) Short.MIN_VALUE), Bytes.toBytes((long) Long.MIN_VALUE),
+ Bytes.toBytes((float) Float.MIN_VALUE), Bytes.toBytes(Double.MIN_VALUE),
+ Bytes.toBytes(false)},
+ {Bytes.toBytes(Integer.MAX_VALUE), new byte [] {Byte.MAX_VALUE},
+ Bytes.toBytes((short) Short.MAX_VALUE), Bytes.toBytes((long) Long.MAX_VALUE),
+ Bytes.toBytes((float) Float.MAX_VALUE), Bytes.toBytes(Double.MAX_VALUE),
+ Bytes.toBytes(true)}
+ };
+
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ Result [] r = new Result [] {null, null, null};
+ Put [] p = new Put [] {null, null, null};
+
+ for (int i = 0; i < r.length; i++) {
+ kvs.clear();
+ p[i] = new Put(rowKeys[i]);
+
+ for (int j = 0; j < columnQualifiersAndValues[i].length; j++) {
+ kvs.add(new KeyValue(rowKeys[i], columnFamilies[j], columnQualifiersAndValues[i][j],
+ columnQualifiersAndValues[i][j]));
+ p[i].add(columnFamilies[j], columnQualifiersAndValues[i][j],
+ columnQualifiersAndValues[i][j]);
+ }
+
+ r[i] = new Result(kvs);
+ }
+
+ Object [][] expectedData = {
+ {new Text(Integer.toString(1)), new IntWritable(1), new ByteWritable((byte) 1),
+ new ShortWritable((short) 1), new LongWritable(1), new FloatWritable(1.0F),
+ new DoubleWritable(1.0), new BooleanWritable(true)},
+ {new Text(Integer.toString(Integer.MIN_VALUE)), new IntWritable(Integer.MIN_VALUE),
+ new ByteWritable(Byte.MIN_VALUE), new ShortWritable(Short.MIN_VALUE),
+ new LongWritable(Long.MIN_VALUE), new FloatWritable(Float.MIN_VALUE),
+ new DoubleWritable(Double.MIN_VALUE), new BooleanWritable(false)},
+ {new Text(Integer.toString(Integer.MAX_VALUE)), new IntWritable(Integer.MAX_VALUE),
+ new ByteWritable(Byte.MAX_VALUE), new ShortWritable(Short.MAX_VALUE),
+ new LongWritable(Long.MAX_VALUE), new FloatWritable(Float.MAX_VALUE),
+ new DoubleWritable(Double.MAX_VALUE), new BooleanWritable(true)}};
+
+ HBaseSerDe hbaseSerDe = new HBaseSerDe();
+ Configuration conf = new Configuration();
+ Properties tbl = createPropertiesForHiveMapHBaseColumnFamily();
+ hbaseSerDe.initialize(conf, tbl);
+
+ deserializeAndSerializeHiveMapHBaseColumnFamily(hbaseSerDe, r, p, expectedData, rowKeys,
+ columnFamilies, columnQualifiersAndValues);
+
+ hbaseSerDe = new HBaseSerDe();
+ conf = new Configuration();
+ tbl = createPropertiesForHiveMapHBaseColumnFamilyII();
+ hbaseSerDe.initialize(conf, tbl);
+
+ deserializeAndSerializeHiveMapHBaseColumnFamily(hbaseSerDe, r, p, expectedData, rowKeys,
+ columnFamilies, columnQualifiersAndValues);
+ }
+
+ private void deserializeAndSerializeHiveMapHBaseColumnFamily(
+ HBaseSerDe hbaseSerDe,
+ Result [] r,
+ Put [] p,
+ Object [][] expectedData,
+ byte [][] rowKeys,
+ byte [][] columnFamilies,
+ byte [][][] columnQualifiersAndValues) throws SerDeException {
+
+ StructObjectInspector soi = (StructObjectInspector) hbaseSerDe.getObjectInspector();
+ List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
+ assertEquals(8, fieldRefs.size());
+
+ // Deserialize
+ for (int i = 0; i < r.length; i++) {
+ Object row = hbaseSerDe.deserialize(r[i]);
+ Put serializedPut = (Put) hbaseSerDe.serialize(row, soi);
+ byte [] rowKey = serializedPut.getRow();
+
+ for (int k = 0; k < rowKey.length; k++) {
+ assertEquals(rowKey[k], rowKeys[i][k]);
+ }
+
+ assertEquals(columnFamilies.length, serializedPut.numFamilies());
+
+ for (int j = 0; j < fieldRefs.size(); j++) {
+ Object fieldData = soi.getStructFieldData(row, fieldRefs.get(j));
+
+ assertNotNull(fieldData);
+
+ if (fieldData instanceof LazyPrimitive<?, ?>) {
+ assertEquals(expectedData[i][j],
+ ((LazyPrimitive<?, ?>) fieldData).getWritableObject());
+ } else if (fieldData instanceof LazyHBaseCellMap) {
+ LazyPrimitive<?, ?> lazyPrimitive = (LazyPrimitive<?, ?>)
+ ((LazyHBaseCellMap) fieldData).getMapValueElement(expectedData[i][j]);
+ assertEquals(expectedData[i][j], lazyPrimitive.getWritableObject());
+ } else {
+ fail("Error: field data not an instance of LazyPrimitive<?,?> or LazyMap");
+ }
+ }
+ }
+ }
+
+ private Properties createPropertiesForHiveMapHBaseColumnFamily() {
+ Properties tbl = new Properties();
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty(Constants.LIST_COLUMNS,
+ "key,valint,valbyte,valshort,vallong,valfloat,valdouble,valbool");
+ tbl.setProperty(Constants.LIST_COLUMN_TYPES,
+ "string:map<int,int>:map<tinyint,tinyint>:map<smallint,smallint>:map<bigint,bigint>:"
+ + "map<float,float>:map<double,double>:map<boolean,boolean>");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key#-,cf-int:#b:b,cf-byte:#b:b,cf-short:#b:b,cf-long:#b:b,cf-float:#b:b,cf-double:#b:b," +
+ "cf-bool:#b:b");
+ return tbl;
+ }
+
+ private Properties createPropertiesForHiveMapHBaseColumnFamilyII() {
+ Properties tbl = new Properties();
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty(Constants.LIST_COLUMNS,
+ "key,valint,valbyte,valshort,vallong,valfloat,valdouble,valbool");
+ tbl.setProperty(Constants.LIST_COLUMN_TYPES,
+ "string:map<int,int>:map<tinyint,tinyint>:map<smallint,smallint>:map<bigint,bigint>:"
+ + "map<float,float>:map<double,double>:map<boolean,boolean>");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key#-,cf-int:#-:-,cf-byte:#-:-,cf-short:#-:-,cf-long:#-:-,cf-float:#-:-,cf-double:#-:-," +
+ "cf-bool:#-:-");
+ tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary");
+ return tbl;
+ }
+
+ public void testHBaseSerDeWithHiveMapToHBaseColumnFamilyII() throws SerDeException {
+
+ byte [] cfbyte = "cf-byte".getBytes();
+ byte [] cfshort = "cf-short".getBytes();
+ byte [] cfint = "cf-int".getBytes();
+ byte [] cflong = "cf-long".getBytes();
+ byte [] cffloat = "cf-float".getBytes();
+ byte [] cfdouble = "cf-double".getBytes();
+ byte [] cfstring = "cf-string".getBytes();
+ byte [] cfbool = "cf-bool".getBytes();
+
+ byte [][] columnFamilies =
+ new byte [][] {cfbyte, cfshort, cfint, cflong, cffloat, cfdouble, cfstring, cfbool};
+
+ byte [] rowKey = Bytes.toBytes("row-key");
+
+ byte [][] columnQualifiersAndValues = new byte [][] {
+ Bytes.toBytes("123"), Bytes.toBytes("456"), Bytes.toBytes("789"), Bytes.toBytes("1000"),
+ Bytes.toBytes("-0.01"), Bytes.toBytes("5.3"), Bytes.toBytes("Hive"),
+ Bytes.toBytes("true")
+ };
+
+ Put p = new Put(rowKey);
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+ for (int j = 0; j < columnQualifiersAndValues.length; j++) {
+ kvs.add(new KeyValue(rowKey,
+ columnFamilies[j], columnQualifiersAndValues[j], columnQualifiersAndValues[j]));
+ p.add(columnFamilies[j], columnQualifiersAndValues[j], columnQualifiersAndValues[j]);
+ }
+
+ Result r = new Result(kvs);
+
+ Object [] expectedData = {
+ new Text("row-key"), new ByteWritable((byte) 123), new ShortWritable((short) 456),
+ new IntWritable(789), new LongWritable(1000), new FloatWritable(-0.01F),
+ new DoubleWritable(5.3), new Text("Hive"), new BooleanWritable(true)
+ };
+
+ HBaseSerDe hbaseSerDe = new HBaseSerDe();
+ Configuration conf = new Configuration();
+ Properties tbl = createPropertiesForHiveMapHBaseColumnFamilyII_I();
+ hbaseSerDe.initialize(conf, tbl);
+
+ deserializeAndSerializeHiveMapHBaseColumnFamilyII(hbaseSerDe, r, p, expectedData,
+ columnFamilies, columnQualifiersAndValues);
+
+ hbaseSerDe = new HBaseSerDe();
+ conf = new Configuration();
+ tbl = createPropertiesForHiveMapHBaseColumnFamilyII_II();
+ hbaseSerDe.initialize(conf, tbl);
+
+ deserializeAndSerializeHiveMapHBaseColumnFamilyII(hbaseSerDe, r, p, expectedData,
+ columnFamilies, columnQualifiersAndValues);
+ }
+
+ private Properties createPropertiesForHiveMapHBaseColumnFamilyII_I() {
+ Properties tbl = new Properties();
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty(Constants.LIST_COLUMNS,
+ "key,valbyte,valshort,valint,vallong,valfloat,valdouble,valstring,valbool");
+ tbl.setProperty(Constants.LIST_COLUMN_TYPES,
+ "string:map<tinyint,tinyint>:map<smallint,smallint>:map<int,int>:map<bigint,bigint>:"
+ + "map<float,float>:map<double,double>:map<string,string>:map<boolean,boolean>");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key#s,cf-byte:#-:s,cf-short:#s:-,cf-int:#s:s,cf-long:#-:-,cf-float:#s:-,cf-double:#-:s," +
+ "cf-string:#s:s,cf-bool:#-:-");
+ return tbl;
+ }
+
+ private Properties createPropertiesForHiveMapHBaseColumnFamilyII_II() {
+ Properties tbl = new Properties();
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty(Constants.LIST_COLUMNS,
+ "key,valbyte,valshort,valint,vallong,valfloat,valdouble,valstring,valbool");
+ tbl.setProperty(Constants.LIST_COLUMN_TYPES,
+ "string:map<tinyint,tinyint>:map<smallint,smallint>:map<int,int>:map<bigint,bigint>:"
+ + "map<float,float>:map<double,double>:map<string,string>:map<boolean,boolean>");
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key#s,cf-byte:#s:s,cf-short:#s:s,cf-int:#s:s,cf-long:#s:s,cf-float:#s:s,cf-double:#s:s," +
+ "cf-string:#s:s,cf-bool:#s:s");
+ tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary");
+ return tbl;
+ }
+
+ private void deserializeAndSerializeHiveMapHBaseColumnFamilyII(
+ HBaseSerDe hbaseSerDe,
+ Result r,
+ Put p,
+ Object [] expectedData,
+ byte [][] columnFamilies,
+ byte [][] columnQualifiersAndValues) throws SerDeException {
+
+ StructObjectInspector soi = (StructObjectInspector) hbaseSerDe.getObjectInspector();
+ List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
+ assertEquals(9, fieldRefs.size());
+
+ // Deserialize
+ Object row = hbaseSerDe.deserialize(r);
+
+ for (int j = 0; j < fieldRefs.size(); j++) {
+ Object fieldData = soi.getStructFieldData(row, fieldRefs.get(j));
+ assertNotNull(fieldData);
+
+ if (fieldData instanceof LazyPrimitive<?, ?>) {
+ assertEquals(expectedData[j], ((LazyPrimitive<?, ?>) fieldData).getWritableObject());
+ } else if (fieldData instanceof LazyHBaseCellMap) {
+ LazyPrimitive<?, ?> lazyPrimitive = (LazyPrimitive<?, ?>)
+ ((LazyHBaseCellMap) fieldData).getMapValueElement(expectedData[j]);
+ assertEquals(expectedData[j], lazyPrimitive.getWritableObject());
+ } else {
+ fail("Error: field data not an instance of LazyPrimitive<?, ?> or LazyHBaseCellMap");
+ }
+ }
+
+ // Serialize
+ Put serializedPut = (Put) hbaseSerDe.serialize(row, soi);
+ assertEquals("Serialized data: ", p.toString(), serializedPut.toString());
+ }
}