You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/03/19 02:52:11 UTC
svn commit: r1579114 - in /hive/trunk/hbase-handler/src:
java/org/apache/hadoop/hive/hbase/ test/org/apache/hadoop/hive/hbase/
Author: xuefu
Date: Wed Mar 19 01:52:11 2014
New Revision: 1579114
URL: http://svn.apache.org/r1579114
Log:
HIVE-6677: HBaseSerDe needs to be refactored (reviewed by Prasad)
Added:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=1579114&r1=1579113&r2=1579114&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Wed Mar 19 01:52:11 2014
@@ -26,15 +26,12 @@ import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -45,9 +42,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -60,6 +54,7 @@ import org.apache.hadoop.io.Writable;
* deserialize objects from an HBase table.
*/
public class HBaseSerDe extends AbstractSerDe {
+ public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
public static final String HBASE_COLUMNS_MAPPING = "hbase.columns.mapping";
public static final String HBASE_TABLE_NAME = "hbase.table.name";
@@ -70,44 +65,27 @@ public class HBaseSerDe extends Abstract
public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";
-
- /** Determines whether a regex matching should be done on the columns or not. Defaults to true.
- * <strong>WARNING: Note that currently this only supports the suffix wildcard .*</strong> **/
+ /**
+ * Determines whether a regex matching should be done on the columns or not. Defaults to true.
+ * <strong>WARNING: Note that currently this only supports the suffix wildcard .*</strong>
+ */
public static final String HBASE_COLUMNS_REGEX_MATCHING = "hbase.columns.mapping.regex.matching";
- public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
-
private ObjectInspector cachedObjectInspector;
- private String hbaseColumnsMapping;
- private boolean doColumnRegexMatching;
- private List<ColumnMapping> columnsMapping;
- private SerDeParameters serdeParams;
- private boolean useJSONSerialize;
private LazyHBaseRow cachedHBaseRow;
- private final ByteStream.Output serializeStream = new ByteStream.Output();
- private int iKey;
- private long putTimestamp;
- private Class<?> compositeKeyClass;
private Object compositeKeyObj;
- // used for serializing a field
- private byte [] separators; // the separators array
- private boolean escaped; // whether we need to escape the data when writing out
- private byte escapeChar; // which char to use as the escape char, e.g. '\\'
- private boolean [] needsEscape; // which chars need to be escaped. This array should have size
- // of 128. Negative byte values (or byte values >= 128) are
- // never escaped.
+ private HBaseSerDeParameters serdeParams;
+
@Override
public String toString() {
return getClass().toString()
+ "["
- + hbaseColumnsMapping
+ + serdeParams.getColumnMappingString()
+ ":"
- + ((StructTypeInfo) serdeParams.getRowTypeInfo())
- .getAllStructFieldNames()
+ + serdeParams.getRowTypeInfo().getAllStructFieldNames()
+ ":"
- + ((StructTypeInfo) serdeParams.getRowTypeInfo())
- .getAllStructFieldTypeInfos() + "]";
+ + serdeParams.getRowTypeInfo().getAllStructFieldTypeInfos() + "]";
}
public HBaseSerDe() throws SerDeException {
@@ -120,8 +98,8 @@ public class HBaseSerDe extends Abstract
@Override
public void initialize(Configuration conf, Properties tbl)
throws SerDeException {
-
- initHBaseSerDeParameters(conf, tbl, getClass().getName());
+ serdeParams = new HBaseSerDeParameters();
+ serdeParams.init(conf, tbl, getClass().getName());
cachedObjectInspector = LazyFactory.createLazyStructInspector(
serdeParams.getColumnNames(),
@@ -132,10 +110,9 @@ public class HBaseSerDe extends Abstract
serdeParams.isEscaped(),
serdeParams.getEscapeChar());
- cachedHBaseRow = new LazyHBaseRow(
- (LazySimpleStructObjectInspector) cachedObjectInspector);
+ cachedHBaseRow = new LazyHBaseRow((LazySimpleStructObjectInspector) cachedObjectInspector);
- if (compositeKeyClass != null) {
+ if (serdeParams.getCompositeKeyClass() != null) {
// initialize the constructor of the composite key class with its object inspector
initCompositeKeyClass(conf,tbl);
}
@@ -146,7 +123,7 @@ public class HBaseSerDe extends Abstract
+ " columnTypes = "
+ serdeParams.getColumnTypes()
+ " hbaseColumnMapping = "
- + hbaseColumnsMapping);
+ + serdeParams.getColumnMappingString());
}
}
@@ -156,20 +133,6 @@ public class HBaseSerDe extends Abstract
* columns maps to the HBase row key, by default the first column.
*
* @param columnsMappingSpec string hbase.columns.mapping specified when creating table
- * @return List<ColumnMapping> which contains the column mapping information by position
- * @throws SerDeException
- */
- public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec)
- throws SerDeException {
- return parseColumnsMapping(columnsMappingSpec, true);
- }
-
- /**
- * Parses the HBase columns mapping specifier to identify the column families, qualifiers
- * and also caches the byte arrays corresponding to them. One of the Hive table
- * columns maps to the HBase row key, by default the first column.
- *
- * @param columnsMappingSpec string hbase.columns.mapping specified when creating table
* @param doColumnRegexMatching whether to do a regex matching on the columns or not
* @return List<ColumnMapping> which contains the column mapping information by position
* @throws SerDeException
@@ -261,186 +224,6 @@ public class HBaseSerDe extends Abstract
return columnsMapping;
}
- /*
- * Utility method for parsing a string of the form '-,b,s,-,s:b,...' as a means of specifying
- * whether to use a binary or an UTF string format to serialize and de-serialize primitive
- * data types like boolean, byte, short, int, long, float, and double. This applies to
- * regular columns and also to map column types which are associated with an HBase column
- * family. For the map types, we apply the specification to the key or the value provided it
- * is one of the above primitive types. The specifier is a colon separated value of the form
- * -:s, or b:b where we have 's', 'b', or '-' on either side of the colon. 's' is for string
- * format storage, 'b' is for native fixed width byte oriented storage, and '-' uses the
- * table level default.
- *
- * @param hbaseTableDefaultStorageType - the specification associated with the table property
- * hbase.table.default.storage.type
- * @throws SerDeException on parse error.
- */
-
- private void parseColumnStorageTypes(String hbaseTableDefaultStorageType)
- throws SerDeException {
-
- boolean tableBinaryStorage = false;
-
- if (hbaseTableDefaultStorageType != null && !"".equals(hbaseTableDefaultStorageType)) {
- if (hbaseTableDefaultStorageType.equals("binary")) {
- tableBinaryStorage = true;
- } else if (!hbaseTableDefaultStorageType.equals("string")) {
- throw new SerDeException("Error: " + HBASE_TABLE_DEFAULT_STORAGE_TYPE +
- " parameter must be specified as" +
- " 'string' or 'binary'; '" + hbaseTableDefaultStorageType +
- "' is not a valid specification for this table/serde property.");
- }
- }
-
- // parse the string to determine column level storage type for primitive types
- // 's' is for variable length string format storage
- // 'b' is for fixed width binary storage of bytes
- // '-' is for table storage type, which defaults to UTF8 string
- // string data is always stored in the default escaped storage format; the data types
- // byte, short, int, long, float, and double have a binary byte oriented storage option
- List<TypeInfo> columnTypes = serdeParams.getColumnTypes();
-
- for (int i = 0; i < columnsMapping.size(); i++) {
-
- ColumnMapping colMap = columnsMapping.get(i);
- TypeInfo colType = columnTypes.get(i);
- String mappingSpec = colMap.mappingSpec;
- String [] mapInfo = mappingSpec.split("#");
- String [] storageInfo = null;
-
- if (mapInfo.length == 2) {
- storageInfo = mapInfo[1].split(":");
- }
-
- if (storageInfo == null) {
-
- // use the table default storage specification
- if (colType.getCategory() == Category.PRIMITIVE) {
- if (!colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
- colMap.binaryStorage.add(tableBinaryStorage);
- } else {
- colMap.binaryStorage.add(false);
- }
- } else if (colType.getCategory() == Category.MAP) {
- TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
- TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
-
- if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
- !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
- colMap.binaryStorage.add(tableBinaryStorage);
- } else {
- colMap.binaryStorage.add(false);
- }
-
- if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
- !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
- colMap.binaryStorage.add(tableBinaryStorage);
- } else {
- colMap.binaryStorage.add(false);
- }
- } else {
- colMap.binaryStorage.add(false);
- }
-
- } else if (storageInfo.length == 1) {
- // we have a storage specification for a primitive column type
- String storageOption = storageInfo[0];
-
- if ((colType.getCategory() == Category.MAP) ||
- !(storageOption.equals("-") || "string".startsWith(storageOption) ||
- "binary".startsWith(storageOption))) {
- throw new SerDeException("Error: A column storage specification is one of the following:"
- + " '-', a prefix of 'string', or a prefix of 'binary'. "
- + storageOption + " is not a valid storage option specification for "
- + serdeParams.getColumnNames().get(i));
- }
-
- if (colType.getCategory() == Category.PRIMITIVE &&
- !colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
-
- if ("-".equals(storageOption)) {
- colMap.binaryStorage.add(tableBinaryStorage);
- } else if ("binary".startsWith(storageOption)) {
- colMap.binaryStorage.add(true);
- } else {
- colMap.binaryStorage.add(false);
- }
- } else {
- colMap.binaryStorage.add(false);
- }
-
- } else if (storageInfo.length == 2) {
- // we have a storage specification for a map column type
-
- String keyStorage = storageInfo[0];
- String valStorage = storageInfo[1];
-
- if ((colType.getCategory() != Category.MAP) ||
- !(keyStorage.equals("-") || "string".startsWith(keyStorage) ||
- "binary".startsWith(keyStorage)) ||
- !(valStorage.equals("-") || "string".startsWith(valStorage) ||
- "binary".startsWith(valStorage))) {
- throw new SerDeException("Error: To specify a valid column storage type for a Map"
- + " column, use any two specifiers from '-', a prefix of 'string', "
- + " and a prefix of 'binary' separated by a ':'."
- + " Valid examples are '-:-', 's:b', etc. They specify the storage type for the"
- + " key and value parts of the Map<?,?> respectively."
- + " Invalid storage specification for column "
- + serdeParams.getColumnNames().get(i)
- + "; " + storageInfo[0] + ":" + storageInfo[1]);
- }
-
- TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
- TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
-
- if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
- !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
-
- if (keyStorage.equals("-")) {
- colMap.binaryStorage.add(tableBinaryStorage);
- } else if ("binary".startsWith(keyStorage)) {
- colMap.binaryStorage.add(true);
- } else {
- colMap.binaryStorage.add(false);
- }
- } else {
- colMap.binaryStorage.add(false);
- }
-
- if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
- !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
- if (valStorage.equals("-")) {
- colMap.binaryStorage.add(tableBinaryStorage);
- } else if ("binary".startsWith(valStorage)) {
- colMap.binaryStorage.add(true);
- } else {
- colMap.binaryStorage.add(false);
- }
- } else {
- colMap.binaryStorage.add(false);
- }
-
- if (colMap.binaryStorage.size() != 2) {
- throw new SerDeException("Error: In parsing the storage specification for column "
- + serdeParams.getColumnNames().get(i));
- }
-
- } else {
- // error in storage specification
- throw new SerDeException("Error: " + HBASE_COLUMNS_MAPPING + " storage specification "
- + mappingSpec + " is not valid for column: "
- + serdeParams.getColumnNames().get(i));
- }
- }
- }
-
-
- public static boolean isRowKeyColumn(String hbaseColumnName) {
- return hbaseColumnName.equals(HBASE_KEY_COL);
- }
-
-
static class ColumnMapping {
ColumnMapping() {
@@ -458,97 +241,6 @@ public class HBaseSerDe extends Abstract
byte[] qualifierPrefixBytes;
}
- private void initHBaseSerDeParameters(
- Configuration job, Properties tbl, String serdeName)
- throws SerDeException {
-
- // Read configuration parameters
- hbaseColumnsMapping = tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
- String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
- putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1"));
-
- doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true"));
-
- String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
-
- if (compKeyClass != null) {
- try {
- compositeKeyClass = job.getClassByName(compKeyClass);
- } catch (ClassNotFoundException e) {
- throw new SerDeException(e);
- }
- }
-
- // Parse and initialize the HBase columns mapping
- columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
-
- // Build the type property string if not supplied
- if (columnTypeProperty == null) {
- StringBuilder sb = new StringBuilder();
-
- for (int i = 0; i < columnsMapping.size(); i++) {
- if (sb.length() > 0) {
- sb.append(":");
- }
-
- ColumnMapping colMap = columnsMapping.get(i);
-
- if (colMap.hbaseRowKey) {
- // the row key column becomes a STRING
- sb.append(serdeConstants.STRING_TYPE_NAME);
- } else if (colMap.qualifierName == null) {
- // a column family become a MAP
- sb.append(serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + ","
- + serdeConstants.STRING_TYPE_NAME + ">");
- } else {
- // an individual column becomes a STRING
- sb.append(serdeConstants.STRING_TYPE_NAME);
- }
- }
- tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, sb.toString());
- }
-
- serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
-
- if (columnsMapping.size() != serdeParams.getColumnNames().size()) {
- throw new SerDeException(serdeName + ": columns has " +
- serdeParams.getColumnNames().size() +
- " elements while hbase.columns.mapping has " +
- columnsMapping.size() + " elements" +
- " (counting the key if implicit)");
- }
-
- separators = serdeParams.getSeparators();
- escaped = serdeParams.isEscaped();
- escapeChar = serdeParams.getEscapeChar();
- needsEscape = serdeParams.getNeedsEscape();
-
- // check that the mapping schema is right;
- // check that the "column-family:" is mapped to Map<key,?>
- // where key extends LazyPrimitive<?, ?> and thus has type Category.PRIMITIVE
- for (int i = 0; i < columnsMapping.size(); i++) {
- ColumnMapping colMap = columnsMapping.get(i);
- if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
- TypeInfo typeInfo = serdeParams.getColumnTypes().get(i);
- if ((typeInfo.getCategory() != Category.MAP) ||
- (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory()
- != Category.PRIMITIVE)) {
-
- throw new SerDeException(
- serdeName + ": hbase column family '" + colMap.familyName
- + "' should be mapped to Map<? extends LazyPrimitive<?, ?>,?>, that is "
- + "the Key for the map should be of primitive type, but is mapped to "
- + typeInfo.getTypeName());
- }
- }
- }
-
- // Precondition: make sure this is done after the rest of the SerDe initialization is done.
- String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
- parseColumnStorageTypes(hbaseTableStorageType);
- setKeyColumnOffset();
- }
-
/**
* Deserialize a row from the HBase Result writable to a LazyObject
* @param result the HBase Result Writable containing the row
@@ -557,13 +249,12 @@ public class HBaseSerDe extends Abstract
*/
@Override
public Object deserialize(Writable result) throws SerDeException {
-
if (!(result instanceof ResultWritable)) {
throw new SerDeException(getClass().getName() + ": expects ResultWritable!");
}
- cachedHBaseRow.init(((ResultWritable) result).getResult(), columnsMapping, compositeKeyObj);
-
+ cachedHBaseRow.init(((ResultWritable) result).getResult(), serdeParams.getColumnMapping(),
+ compositeKeyObj);
return cachedHBaseRow;
}
@@ -591,34 +282,43 @@ public class HBaseSerDe extends Abstract
List<? extends StructField> fields = soi.getAllStructFieldRefs();
List<Object> list = soi.getStructFieldsDataAsList(obj);
List<? extends StructField> declaredFields =
- (serdeParams.getRowTypeInfo() != null &&
- ((StructTypeInfo) serdeParams.getRowTypeInfo())
- .getAllStructFieldNames().size() > 0) ?
- ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs()
- : null;
-
- Put put = null;
+ ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs();
+ int iKey = serdeParams.getKeyIndex();
+ StructField field = fields.get(iKey);
+ Object value = list.get(iKey);
+ StructField declaredField = declaredFields.get(iKey);
+ byte[] key;
try {
- byte [] key = serializeField(iKey, null, fields, list, declaredFields);
+ key = serializeKeyField(field, value, declaredField, serdeParams);
+ } catch (IOException ex) {
+ throw new SerDeException(ex);
+ }
- if (key == null) {
- throw new SerDeException("HBase row key cannot be NULL");
- }
+ if (key == null) {
+ throw new SerDeException("HBase row key cannot be NULL");
+ }
- if(putTimestamp >= 0) {
- put = new Put(key,putTimestamp);
- } else {
- put = new Put(key);
- }
+ Put put = null;
+ long putTimestamp = serdeParams.getPutTimestamp();
+ if(putTimestamp >= 0) {
+ put = new Put(key, putTimestamp);
+ } else {
+ put = new Put(key);
+ }
+ try {
// Serialize each field
for (int i = 0; i < fields.size(); i++) {
if (i == iKey) {
- // already processed the key above
continue;
}
- serializeField(i, put, fields, list, declaredFields);
+
+ field = fields.get(i);
+ value = list.get(i);
+ declaredField = declaredFields.get(i);
+ ColumnMapping colMap = serdeParams.getColumnMapping().get(i);
+ serializeField(put, field, value, declaredField, colMap);
}
} catch (IOException e) {
throw new SerDeException(e);
@@ -627,99 +327,97 @@ public class HBaseSerDe extends Abstract
return new PutWritable(put);
}
- private byte [] serializeField(
- int i,
- Put put,
- List<? extends StructField> fields,
- List<Object> list,
- List<? extends StructField> declaredFields) throws IOException {
-
- // column mapping info
- ColumnMapping colMap = columnsMapping.get(i);
+ private static byte[] serializeKeyField(StructField keyField, Object keyValue,
+ StructField declaredKeyField, HBaseSerDeParameters serdeParams) throws IOException {
+ if (keyValue == null) {
+ // a null object, we do not serialize it
+ return null;
+ }
+
+ boolean writeBinary = serdeParams.getKeyColumnMapping().binaryStorage.get(0);
+ ObjectInspector keyFieldOI = keyField.getFieldObjectInspector();
+
+ if (!keyFieldOI.getCategory().equals(Category.PRIMITIVE) &&
+ declaredKeyField.getFieldObjectInspector().getCategory().equals(Category.PRIMITIVE)) {
+ // we always serialize the String type using the escaped algorithm for LazyString
+ return serialize(SerDeUtils.getJSONString(keyValue, keyFieldOI),
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false, serdeParams);
+ } else {
+ // use the serialization option switch to write primitive values as either a variable
+ // length UTF8 string or a fixed width bytes if serializing in binary format
+ return serialize(keyValue, keyFieldOI, 1, writeBinary, serdeParams);
+ }
- // Get the field objectInspector and the field object.
- ObjectInspector foi = fields.get(i).getFieldObjectInspector();
- Object f = (list == null ? null : list.get(i));
+ }
- if (f == null) {
+ private void serializeField(Put put, StructField field, Object value,
+ StructField declaredField, ColumnMapping colMap) throws IOException {
+ if (value == null) {
// a null object, we do not serialize it
- return null;
+ return;
}
+ // Get the field objectInspector and the field object.
+ ObjectInspector foi = field.getFieldObjectInspector();
+
// If the field corresponds to a column family in HBase
- if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
+ if (colMap.qualifierName == null) {
MapObjectInspector moi = (MapObjectInspector) foi;
ObjectInspector koi = moi.getMapKeyObjectInspector();
ObjectInspector voi = moi.getMapValueObjectInspector();
- Map<?, ?> map = moi.getMap(f);
+ Map<?, ?> map = moi.getMap(value);
if (map == null) {
- return null;
+ return;
} else {
for (Map.Entry<?, ?> entry: map.entrySet()) {
// Get the Key
- serializeStream.reset();
-
// Map keys are required to be primitive and may be serialized in binary format
- boolean isNotNull = serialize(entry.getKey(), koi, 3, colMap.binaryStorage.get(0));
- if (!isNotNull) {
+ byte[] columnQualifierBytes = serialize(entry.getKey(), koi, 3, colMap.binaryStorage.get(0), serdeParams);
+ if (columnQualifierBytes == null) {
continue;
}
- // Get the column-qualifier
- byte [] columnQualifierBytes = new byte[serializeStream.getCount()];
- System.arraycopy(
- serializeStream.getData(), 0, columnQualifierBytes, 0, serializeStream.getCount());
-
- // Get the Value
- serializeStream.reset();
-
// Map values may be serialized in binary format when they are primitive and binary
// serialization is the option selected
- isNotNull = serialize(entry.getValue(), voi, 3, colMap.binaryStorage.get(1));
- if (!isNotNull) {
+ byte[] bytes = serialize(entry.getValue(), voi, 3, colMap.binaryStorage.get(1), serdeParams);
+ if (bytes == null) {
continue;
}
- byte [] value = new byte[serializeStream.getCount()];
- System.arraycopy(serializeStream.getData(), 0, value, 0, serializeStream.getCount());
- put.add(colMap.familyNameBytes, columnQualifierBytes, value);
+
+ put.add(colMap.familyNameBytes, columnQualifierBytes, bytes);
}
}
} else {
+ byte[] bytes = null;
// If the field that is passed in is NOT a primitive, and either the
// field is not declared (no schema was given at initialization), or
// the field is declared as a primitive in initialization, serialize
// the data to JSON string. Otherwise serialize the data in the
// delimited way.
- serializeStream.reset();
- boolean isNotNull;
if (!foi.getCategory().equals(Category.PRIMITIVE)
- && (declaredFields == null ||
- declaredFields.get(i).getFieldObjectInspector().getCategory()
- .equals(Category.PRIMITIVE) || useJSONSerialize)) {
-
+ && declaredField.getFieldObjectInspector().getCategory().equals(Category.PRIMITIVE)) {
// we always serialize the String type using the escaped algorithm for LazyString
- isNotNull = serialize(
- SerDeUtils.getJSONString(f, foi),
- PrimitiveObjectInspectorFactory.javaStringObjectInspector,
- 1, false);
+ bytes = serialize(SerDeUtils.getJSONString(value, foi),
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false, serdeParams);
} else {
// use the serialization option switch to write primitive values as either a variable
// length UTF8 string or a fixed width bytes if serializing in binary format
- isNotNull = serialize(f, foi, 1, colMap.binaryStorage.get(0));
+ bytes = serialize(value, foi, 1, colMap.binaryStorage.get(0), serdeParams);
}
- if (!isNotNull) {
- return null;
- }
- byte [] key = new byte[serializeStream.getCount()];
- System.arraycopy(serializeStream.getData(), 0, key, 0, serializeStream.getCount());
- if (i == iKey) {
- return key;
+
+ if (bytes == null) {
+ return;
}
- put.add(colMap.familyNameBytes, colMap.qualifierNameBytes, key);
+
+ put.add(colMap.familyNameBytes, colMap.qualifierNameBytes, bytes);
}
+ }
- return null;
+ private static byte[] getBytesFromStream(ByteStream.Output ss) {
+ byte [] buf = new byte[ss.getCount()];
+ System.arraycopy(ss.getData(), 0, buf, 0, ss.getCount());
+ return buf;
}
/*
@@ -733,33 +431,35 @@ public class HBaseSerDe extends Abstract
* @throws IOException On error in writing to the serialization stream.
* @return true On serializing a non-null object, otherwise false.
*/
- private boolean serialize(
- Object obj,
- ObjectInspector objInspector,
- int level,
- boolean writeBinary) throws IOException {
-
+ private static byte[] serialize(Object obj, ObjectInspector objInspector, int level,
+ boolean writeBinary, HBaseSerDeParameters serdeParams) throws IOException {
+ ByteStream.Output ss = new ByteStream.Output();
if (objInspector.getCategory() == Category.PRIMITIVE && writeBinary) {
- LazyUtils.writePrimitive(serializeStream, obj, (PrimitiveObjectInspector) objInspector);
- return true;
+ LazyUtils.writePrimitive(ss, obj, (PrimitiveObjectInspector) objInspector);
} else {
- return serialize(obj, objInspector, level);
+ if (false == serialize(obj, objInspector, level, serdeParams, ss)) {
+ return null;
+ }
}
+
+ return getBytesFromStream(ss);
}
- private boolean serialize(
+ private static boolean serialize(
Object obj,
ObjectInspector objInspector,
- int level) throws IOException {
+ int level, HBaseSerDeParameters serdeParams, ByteStream.Output ss) throws IOException {
+ byte[] separators = serdeParams.getSeparators();
+ boolean escaped = serdeParams.isEscaped();
+ byte escapeChar = serdeParams.getEscapeChar();
+ boolean[] needsEscape = serdeParams.getNeedsEscape();
switch (objInspector.getCategory()) {
- case PRIMITIVE: {
- LazyUtils.writePrimitiveUTF8(serializeStream, obj,
+ case PRIMITIVE:
+ LazyUtils.writePrimitiveUTF8(ss, obj,
(PrimitiveObjectInspector) objInspector, escaped, escapeChar, needsEscape);
return true;
- }
-
- case LIST: {
+ case LIST:
char separator = (char) separators[level];
ListObjectInspector loi = (ListObjectInspector)objInspector;
List<?> list = loi.getList(obj);
@@ -769,16 +469,14 @@ public class HBaseSerDe extends Abstract
} else {
for (int i = 0; i < list.size(); i++) {
if (i > 0) {
- serializeStream.write(separator);
+ ss.write(separator);
}
- serialize(list.get(i), eoi, level + 1);
+ serialize(list.get(i), eoi, level + 1, serdeParams, ss);
}
}
return true;
- }
-
- case MAP: {
- char separator = (char) separators[level];
+ case MAP:
+ char sep = (char) separators[level];
char keyValueSeparator = (char) separators[level+1];
MapObjectInspector moi = (MapObjectInspector) objInspector;
ObjectInspector koi = moi.getMapKeyObjectInspector();
@@ -793,38 +491,35 @@ public class HBaseSerDe extends Abstract
if (first) {
first = false;
} else {
- serializeStream.write(separator);
+ ss.write(sep);
}
- serialize(entry.getKey(), koi, level+2);
- serializeStream.write(keyValueSeparator);
- serialize(entry.getValue(), voi, level+2);
+ serialize(entry.getKey(), koi, level+2, serdeParams, ss);
+ ss.write(keyValueSeparator);
+ serialize(entry.getValue(), voi, level+2, serdeParams, ss);
}
}
return true;
- }
-
- case STRUCT: {
- char separator = (char)separators[level];
+ case STRUCT:
+ sep = (char)separators[level];
StructObjectInspector soi = (StructObjectInspector)objInspector;
List<? extends StructField> fields = soi.getAllStructFieldRefs();
- List<Object> list = soi.getStructFieldsDataAsList(obj);
+ list = soi.getStructFieldsDataAsList(obj);
if (list == null) {
return false;
} else {
for (int i = 0; i < list.size(); i++) {
if (i > 0) {
- serializeStream.write(separator);
+ ss.write(sep);
}
serialize(list.get(i), fields.get(i).getFieldObjectInspector(),
- level + 1);
+ level + 1, serdeParams, ss);
}
}
return true;
- }
+ default:
+ throw new RuntimeException("Unknown category type: " + objInspector.getCategory());
}
-
- throw new RuntimeException("Unknown category type: " + objInspector.getCategory());
}
/**
@@ -837,7 +532,7 @@ public class HBaseSerDe extends Abstract
int i = 0;
// find the hbase row key
- for (ColumnMapping colMap : columnsMapping) {
+ for (ColumnMapping colMap : serdeParams.getColumnMapping()) {
if (colMap.hbaseRowKey) {
break;
}
@@ -848,7 +543,7 @@ public class HBaseSerDe extends Abstract
.getAllStructFieldRefs().get(i).getFieldObjectInspector();
try {
- compositeKeyObj = compositeKeyClass.getDeclaredConstructor(
+ compositeKeyObj = serdeParams.getCompositeKeyClass().getDeclaredConstructor(
LazySimpleStructObjectInspector.class, Properties.class, Configuration.class)
.newInstance(
((LazySimpleStructObjectInspector) keyObjectInspector), tbl, conf);
@@ -865,33 +560,19 @@ public class HBaseSerDe extends Abstract
} catch (NoSuchMethodException e) {
// the constructor wasn't defined in the implementation class. Flag error
throw new SerDeException("Constructor not defined in composite key class ["
- + compositeKeyClass.getName() + "]", e);
+ + serdeParams.getCompositeKeyClass().getName() + "]", e);
}
}
/**
- * @return the useJSONSerialize
- */
- public boolean isUseJSONSerialize() {
- return useJSONSerialize;
- }
-
- /**
- * @param useJSONSerialize the useJSONSerialize to set
- */
- public void setUseJSONSerialize(boolean useJSONSerialize) {
- this.useJSONSerialize = useJSONSerialize;
- }
-
- /**
* @return 0-based offset of the key column within the table
*/
int getKeyColumnOffset() {
- return iKey;
+ return serdeParams.getKeyIndex();
}
List<Boolean> getStorageFormatOfCol(int colPos){
- return columnsMapping.get(colPos).binaryStorage;
+ return serdeParams.getColumnMapping().get(colPos).binaryStorage;
}
@Override
@@ -900,10 +581,6 @@ public class HBaseSerDe extends Abstract
return null;
}
- void setKeyColumnOffset() throws SerDeException {
- iKey = getRowKeyColumnOffset(columnsMapping);
- }
-
public static int getRowKeyColumnOffset(List<ColumnMapping> columnsMapping)
throws SerDeException {
Added: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java?rev=1579114&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java (added)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java Wed Mar 19 01:52:11 2014
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.Text;
+
+/**
+ * HBaseSerDeParameters encapsulates SerDeParameters and additional configurations that are specific for
+ * HBaseSerDe.
+ *
+ */
+public class HBaseSerDeParameters {
+ private SerDeParameters serdeParams;
+
+ private String columnMappingString;
+ private List<ColumnMapping> columnMapping;
+ private boolean doColumnRegexMatching;
+
+ private long putTimestamp;
+
+ private Class<?> compositeKeyClass;
+ private int keyIndex;
+
+ void init(Configuration job, Properties tbl, String serdeName) throws SerDeException {
+ serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
+ putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP, "-1"));
+
+ String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
+ if (compKeyClass != null) {
+ try {
+ compositeKeyClass = job.getClassByName(compKeyClass);
+ } catch (ClassNotFoundException e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ // Read configuration parameters
+ columnMappingString = tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+ doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, "true"));
+ // Parse and initialize the HBase columns mapping
+ columnMapping = HBaseSerDe.parseColumnsMapping(columnMappingString, doColumnRegexMatching);
+
+ // Build the type property string if not supplied
+ String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+ if (columnTypeProperty == null) {
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 0; i < columnMapping.size(); i++) {
+ if (sb.length() > 0) {
+ sb.append(":");
+ }
+
+ ColumnMapping colMap = columnMapping.get(i);
+
+ if (colMap.hbaseRowKey) {
+ // the row key column becomes a STRING
+ sb.append(serdeConstants.STRING_TYPE_NAME);
+ } else if (colMap.qualifierName == null) {
+ // a column family become a MAP
+ sb.append(serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + ","
+ + serdeConstants.STRING_TYPE_NAME + ">");
+ } else {
+ // an individual column becomes a STRING
+ sb.append(serdeConstants.STRING_TYPE_NAME);
+ }
+ }
+ tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, sb.toString());
+ }
+
+ if (columnMapping.size() != serdeParams.getColumnNames().size()) {
+ throw new SerDeException(serdeName + ": columns has " +
+ serdeParams.getColumnNames().size() +
+ " elements while hbase.columns.mapping has " +
+ columnMapping.size() + " elements" +
+ " (counting the key if implicit)");
+ }
+
+ // check that the mapping schema is right;
+ // check that the "column-family:" is mapped to Map<key,?>
+ // where key extends LazyPrimitive<?, ?> and thus has type Category.PRIMITIVE
+ for (int i = 0; i < columnMapping.size(); i++) {
+ ColumnMapping colMap = columnMapping.get(i);
+ if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
+ TypeInfo typeInfo = serdeParams.getColumnTypes().get(i);
+ if ((typeInfo.getCategory() != Category.MAP) ||
+ (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory()
+ != Category.PRIMITIVE)) {
+
+ throw new SerDeException(
+ serdeName + ": hbase column family '" + colMap.familyName
+ + "' should be mapped to Map<? extends LazyPrimitive<?, ?>,?>, that is "
+ + "the Key for the map should be of primitive type, but is mapped to "
+ + typeInfo.getTypeName());
+ }
+ }
+ }
+
+ // Precondition: make sure this is done after the rest of the SerDe initialization is done.
+ String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
+ parseColumnStorageTypes(hbaseTableStorageType);
+ setKeyColumnOffset();
+ }
+
+ /*
+ * Utility method for parsing a string of the form '-,b,s,-,s:b,...' as a means of specifying
+ * whether to use a binary or an UTF string format to serialize and de-serialize primitive
+ * data types like boolean, byte, short, int, long, float, and double. This applies to
+ * regular columns and also to map column types which are associated with an HBase column
+ * family. For the map types, we apply the specification to the key or the value provided it
+ * is one of the above primitive types. The specifier is a colon separated value of the form
+ * -:s, or b:b where we have 's', 'b', or '-' on either side of the colon. 's' is for string
+ * format storage, 'b' is for native fixed width byte oriented storage, and '-' uses the
+ * table level default.
+ *
+ * @param hbaseTableDefaultStorageType - the specification associated with the table property
+ * hbase.table.default.storage.type
+ * @throws SerDeException on parse error.
+ */
+
+ public void parseColumnStorageTypes(String hbaseTableDefaultStorageType)
+ throws SerDeException {
+
+ boolean tableBinaryStorage = false;
+
+ if (hbaseTableDefaultStorageType != null && !"".equals(hbaseTableDefaultStorageType)) {
+ if (hbaseTableDefaultStorageType.equals("binary")) {
+ tableBinaryStorage = true;
+ } else if (!hbaseTableDefaultStorageType.equals("string")) {
+ throw new SerDeException("Error: " + HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE +
+ " parameter must be specified as" +
+ " 'string' or 'binary'; '" + hbaseTableDefaultStorageType +
+ "' is not a valid specification for this table/serde property.");
+ }
+ }
+
+ // parse the string to determine column level storage type for primitive types
+ // 's' is for variable length string format storage
+ // 'b' is for fixed width binary storage of bytes
+ // '-' is for table storage type, which defaults to UTF8 string
+ // string data is always stored in the default escaped storage format; the data types
+ // byte, short, int, long, float, and double have a binary byte oriented storage option
+ List<TypeInfo> columnTypes = serdeParams.getColumnTypes();
+
+ for (int i = 0; i < columnMapping.size(); i++) {
+
+ ColumnMapping colMap = columnMapping.get(i);
+ TypeInfo colType = columnTypes.get(i);
+ String mappingSpec = colMap.mappingSpec;
+ String [] mapInfo = mappingSpec.split("#");
+ String [] storageInfo = null;
+
+ if (mapInfo.length == 2) {
+ storageInfo = mapInfo[1].split(":");
+ }
+
+ if (storageInfo == null) {
+
+ // use the table default storage specification
+ if (colType.getCategory() == Category.PRIMITIVE) {
+ if (!colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else if (colType.getCategory() == Category.MAP) {
+ TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
+ TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
+
+ if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
+ !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
+ !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ } else if (storageInfo.length == 1) {
+ // we have a storage specification for a primitive column type
+ String storageOption = storageInfo[0];
+
+ if ((colType.getCategory() == Category.MAP) ||
+ !(storageOption.equals("-") || "string".startsWith(storageOption) ||
+ "binary".startsWith(storageOption))) {
+ throw new SerDeException("Error: A column storage specification is one of the following:"
+ + " '-', a prefix of 'string', or a prefix of 'binary'. "
+ + storageOption + " is not a valid storage option specification for "
+ + serdeParams.getColumnNames().get(i));
+ }
+
+ if (colType.getCategory() == Category.PRIMITIVE &&
+ !colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+
+ if ("-".equals(storageOption)) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else if ("binary".startsWith(storageOption)) {
+ colMap.binaryStorage.add(true);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ } else if (storageInfo.length == 2) {
+ // we have a storage specification for a map column type
+
+ String keyStorage = storageInfo[0];
+ String valStorage = storageInfo[1];
+
+ if ((colType.getCategory() != Category.MAP) ||
+ !(keyStorage.equals("-") || "string".startsWith(keyStorage) ||
+ "binary".startsWith(keyStorage)) ||
+ !(valStorage.equals("-") || "string".startsWith(valStorage) ||
+ "binary".startsWith(valStorage))) {
+ throw new SerDeException("Error: To specify a valid column storage type for a Map"
+ + " column, use any two specifiers from '-', a prefix of 'string', "
+ + " and a prefix of 'binary' separated by a ':'."
+ + " Valid examples are '-:-', 's:b', etc. They specify the storage type for the"
+ + " key and value parts of the Map<?,?> respectively."
+ + " Invalid storage specification for column "
+ + serdeParams.getColumnNames().get(i)
+ + "; " + storageInfo[0] + ":" + storageInfo[1]);
+ }
+
+ TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
+ TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
+
+ if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
+ !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+
+ if (keyStorage.equals("-")) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else if ("binary".startsWith(keyStorage)) {
+ colMap.binaryStorage.add(true);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
+ !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ if (valStorage.equals("-")) {
+ colMap.binaryStorage.add(tableBinaryStorage);
+ } else if ("binary".startsWith(valStorage)) {
+ colMap.binaryStorage.add(true);
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+ } else {
+ colMap.binaryStorage.add(false);
+ }
+
+ if (colMap.binaryStorage.size() != 2) {
+ throw new SerDeException("Error: In parsing the storage specification for column "
+ + serdeParams.getColumnNames().get(i));
+ }
+
+ } else {
+ // error in storage specification
+ throw new SerDeException("Error: " + HBaseSerDe.HBASE_COLUMNS_MAPPING + " storage specification "
+ + mappingSpec + " is not valid for column: "
+ + serdeParams.getColumnNames().get(i));
+ }
+ }
+ }
+
+ void setKeyColumnOffset() throws SerDeException {
+ setKeyIndex(getRowKeyColumnOffset(columnMapping));
+ }
+
+ public static int getRowKeyColumnOffset(List<ColumnMapping> columnsMapping)
+ throws SerDeException {
+
+ for (int i = 0; i < columnsMapping.size(); i++) {
+ ColumnMapping colMap = columnsMapping.get(i);
+
+ if (colMap.hbaseRowKey && colMap.familyName.equals(HBaseSerDe.HBASE_KEY_COL)) {
+ return i;
+ }
+ }
+
+ throw new SerDeException("HBaseSerDe Error: columns mapping list does not contain" +
+ " row key column.");
+ }
+
+ public StructTypeInfo getRowTypeInfo() {
+ return (StructTypeInfo) serdeParams.getRowTypeInfo();
+ }
+
+ public List<String> getColumnNames() {
+ return serdeParams.getColumnNames();
+ }
+
+ public byte[] getSeparators() {
+ return serdeParams.getSeparators();
+ }
+
+ public Text getNullSequence() {
+ return serdeParams.getNullSequence();
+ }
+
+ public boolean isLastColumnTakesRest() {
+ return serdeParams.isLastColumnTakesRest();
+ }
+
+ public boolean isEscaped() {
+ return serdeParams.isEscaped();
+ }
+
+ public byte getEscapeChar() {
+ return serdeParams.getEscapeChar();
+ }
+
+ public List<TypeInfo> getColumnTypes() {
+ return serdeParams.getColumnTypes();
+ }
+
+ public SerDeParameters getSerdeParams() {
+ return serdeParams;
+ }
+
+ public void setSerdeParams(SerDeParameters serdeParams) {
+ this.serdeParams = serdeParams;
+ }
+
+ public String getColumnMappingString() {
+ return columnMappingString;
+ }
+
+ public void setColumnMappingString(String columnMappingString) {
+ this.columnMappingString = columnMappingString;
+ }
+
+ public long getPutTimestamp() {
+ return putTimestamp;
+ }
+
+ public void setPutTimestamp(long putTimestamp) {
+ this.putTimestamp = putTimestamp;
+ }
+
+ public boolean isDoColumnRegexMatching() {
+ return doColumnRegexMatching;
+ }
+
+ public void setDoColumnRegexMatching(boolean doColumnRegexMatching) {
+ this.doColumnRegexMatching = doColumnRegexMatching;
+ }
+
+ public Class<?> getCompositeKeyClass() {
+ return compositeKeyClass;
+ }
+
+ public void setCompositeKeyClass(Class<?> compositeKeyClass) {
+ this.compositeKeyClass = compositeKeyClass;
+ }
+
+ public int getKeyIndex() {
+ return keyIndex;
+ }
+
+ public void setKeyIndex(int keyIndex) {
+ this.keyIndex = keyIndex;
+ }
+
+ public List<ColumnMapping> getColumnMapping() {
+ return columnMapping;
+ }
+
+ public ColumnMapping getKeyColumnMapping() {
+ return columnMapping.get(keyIndex);
+ }
+
+ public boolean[] getNeedsEscape() {
+ return serdeParams.getNeedsEscape();
+ }
+}
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1579114&r1=1579113&r2=1579114&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Wed Mar 19 01:52:11 2014
@@ -155,7 +155,7 @@ public class HBaseStorageHandler extends
Map<String, String> serdeParam = tbl.getSd().getSerdeInfo().getParameters();
String hbaseColumnsMapping = serdeParam.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
- List<ColumnMapping> columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+ List<ColumnMapping> columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, true);
HTableDescriptor tableDesc;
Modified: hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java?rev=1579114&r1=1579113&r2=1579114&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java (original)
+++ hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java Wed Mar 19 01:52:11 2014
@@ -466,7 +466,7 @@ public class TestLazyHBaseObject extends
List<ColumnMapping> columnsMapping = null;
try {
- columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColsMapping);
+ columnsMapping = parseColumnsMapping(hbaseColsMapping);
} catch (SerDeException e) {
fail(e.toString());
}
@@ -591,7 +591,7 @@ public class TestLazyHBaseObject extends
String hbaseColsMapping = ":key,cfa:a,cfa:b,cfb:,cfc:d";
try {
- columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColsMapping);
+ columnsMapping = parseColumnsMapping(hbaseColsMapping);
} catch (SerDeException e) {
fail(e.toString());
}
@@ -716,7 +716,7 @@ public class TestLazyHBaseObject extends
List<ColumnMapping> columnsMapping = null;
try {
- columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+ columnsMapping = parseColumnsMapping(hbaseColumnsMapping);
} catch (SerDeException sde) {
fail(sde.toString());
}
@@ -850,4 +850,19 @@ public class TestLazyHBaseObject extends
}
}
}
+
+ /**
+ * Parses the HBase columns mapping specifier to identify the column families, qualifiers
+ * and also caches the byte arrays corresponding to them. One of the Hive table
+ * columns maps to the HBase row key, by default the first column.
+ *
+ * @param columnsMappingSpec string hbase.columns.mapping specified when creating table
+ * @return List<ColumnMapping> which contains the column mapping information by position
+ * @throws SerDeException
+ */
+ public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec)
+ throws SerDeException {
+ return HBaseSerDe.parseColumnsMapping(columnsMappingSpec, true);
+ }
+
}