You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/03/19 02:52:11 UTC

svn commit: r1579114 - in /hive/trunk/hbase-handler/src: java/org/apache/hadoop/hive/hbase/ test/org/apache/hadoop/hive/hbase/

Author: xuefu
Date: Wed Mar 19 01:52:11 2014
New Revision: 1579114

URL: http://svn.apache.org/r1579114
Log:
HIVE-6677: HBaseSerDe needs to be refactored (reviewed by Prasad)

Added:
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
Modified:
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
    hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=1579114&r1=1579113&r2=1579114&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Wed Mar 19 01:52:11 2014
@@ -26,15 +26,12 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.ByteStream;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
 import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
 import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -45,9 +42,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -60,6 +54,7 @@ import org.apache.hadoop.io.Writable;
  * deserialize objects from an HBase table.
  */
 public class HBaseSerDe extends AbstractSerDe {
+  public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
 
   public static final String HBASE_COLUMNS_MAPPING = "hbase.columns.mapping";
   public static final String HBASE_TABLE_NAME = "hbase.table.name";
@@ -70,44 +65,27 @@ public class HBaseSerDe extends Abstract
   public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
   public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
   public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";
-
-  /** Determines whether a regex matching should be done on the columns or not. Defaults to true.
-   *  <strong>WARNING: Note that currently this only supports the suffix wildcard .*</strong> **/
+  /**
+   *  Determines whether a regex matching should be done on the columns or not. Defaults to true.
+   *  <strong>WARNING: Note that currently this only supports the suffix wildcard .*</strong>
+   */
   public static final String HBASE_COLUMNS_REGEX_MATCHING = "hbase.columns.mapping.regex.matching";
 
-  public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
-
   private ObjectInspector cachedObjectInspector;
-  private String hbaseColumnsMapping;
-  private boolean doColumnRegexMatching;
-  private List<ColumnMapping> columnsMapping;
-  private SerDeParameters serdeParams;
-  private boolean useJSONSerialize;
   private LazyHBaseRow cachedHBaseRow;
-  private final ByteStream.Output serializeStream = new ByteStream.Output();
-  private int iKey;
-  private long putTimestamp;
-  private Class<?> compositeKeyClass;
   private Object compositeKeyObj;
 
-  // used for serializing a field
-  private byte [] separators;     // the separators array
-  private boolean escaped;        // whether we need to escape the data when writing out
-  private byte escapeChar;        // which char to use as the escape char, e.g. '\\'
-  private boolean [] needsEscape; // which chars need to be escaped. This array should have size
-                                  // of 128. Negative byte values (or byte values >= 128) are
-                                  // never escaped.
+  private HBaseSerDeParameters serdeParams;
+
   @Override
   public String toString() {
     return getClass().toString()
         + "["
-        + hbaseColumnsMapping
+        + serdeParams.getColumnMappingString()
         + ":"
-        + ((StructTypeInfo) serdeParams.getRowTypeInfo())
-            .getAllStructFieldNames()
+        + serdeParams.getRowTypeInfo().getAllStructFieldNames()
         + ":"
-        + ((StructTypeInfo) serdeParams.getRowTypeInfo())
-            .getAllStructFieldTypeInfos() + "]";
+        + serdeParams.getRowTypeInfo().getAllStructFieldTypeInfos() + "]";
   }
 
   public HBaseSerDe() throws SerDeException {
@@ -120,8 +98,8 @@ public class HBaseSerDe extends Abstract
   @Override
   public void initialize(Configuration conf, Properties tbl)
       throws SerDeException {
-
-    initHBaseSerDeParameters(conf, tbl, getClass().getName());
+    serdeParams = new HBaseSerDeParameters();
+    serdeParams.init(conf, tbl, getClass().getName());
 
     cachedObjectInspector = LazyFactory.createLazyStructInspector(
         serdeParams.getColumnNames(),
@@ -132,10 +110,9 @@ public class HBaseSerDe extends Abstract
         serdeParams.isEscaped(),
         serdeParams.getEscapeChar());
 
-    cachedHBaseRow = new LazyHBaseRow(
-      (LazySimpleStructObjectInspector) cachedObjectInspector);
+    cachedHBaseRow = new LazyHBaseRow((LazySimpleStructObjectInspector) cachedObjectInspector);
 
-    if (compositeKeyClass != null) {
+    if (serdeParams.getCompositeKeyClass() != null) {
       // initialize the constructor of the composite key class with its object inspector
       initCompositeKeyClass(conf,tbl);
     }
@@ -146,7 +123,7 @@ public class HBaseSerDe extends Abstract
         + " columnTypes = "
         + serdeParams.getColumnTypes()
         + " hbaseColumnMapping = "
-        + hbaseColumnsMapping);
+        + serdeParams.getColumnMappingString());
     }
   }
 
@@ -156,20 +133,6 @@ public class HBaseSerDe extends Abstract
    * columns maps to the HBase row key, by default the first column.
    *
    * @param columnsMappingSpec string hbase.columns.mapping specified when creating table
-   * @return List<ColumnMapping> which contains the column mapping information by position
-   * @throws SerDeException
-   */
-  public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec)
-      throws SerDeException {
-    return parseColumnsMapping(columnsMappingSpec, true);
-  }
-
-  /**
-   * Parses the HBase columns mapping specifier to identify the column families, qualifiers
-   * and also caches the byte arrays corresponding to them. One of the Hive table
-   * columns maps to the HBase row key, by default the first column.
-   *
-   * @param columnsMappingSpec string hbase.columns.mapping specified when creating table
    * @param doColumnRegexMatching whether to do a regex matching on the columns or not
    * @return List<ColumnMapping> which contains the column mapping information by position
    * @throws SerDeException
@@ -261,186 +224,6 @@ public class HBaseSerDe extends Abstract
     return columnsMapping;
   }
 
-  /*
-   * Utility method for parsing a string of the form '-,b,s,-,s:b,...' as a means of specifying
-   * whether to use a binary or an UTF string format to serialize and de-serialize primitive
-   * data types like boolean, byte, short, int, long, float, and double. This applies to
-   * regular columns and also to map column types which are associated with an HBase column
-   * family. For the map types, we apply the specification to the key or the value provided it
-   * is one of the above primitive types. The specifier is a colon separated value of the form
-   * -:s, or b:b where we have 's', 'b', or '-' on either side of the colon. 's' is for string
-   * format storage, 'b' is for native fixed width byte oriented storage, and '-' uses the
-   * table level default.
-   *
-   * @param hbaseTableDefaultStorageType - the specification associated with the table property
-   *        hbase.table.default.storage.type
-   * @throws SerDeException on parse error.
-   */
-
-  private void parseColumnStorageTypes(String hbaseTableDefaultStorageType)
-      throws SerDeException {
-
-    boolean tableBinaryStorage = false;
-
-    if (hbaseTableDefaultStorageType != null && !"".equals(hbaseTableDefaultStorageType)) {
-      if (hbaseTableDefaultStorageType.equals("binary")) {
-        tableBinaryStorage = true;
-      } else if (!hbaseTableDefaultStorageType.equals("string")) {
-        throw new SerDeException("Error: " + HBASE_TABLE_DEFAULT_STORAGE_TYPE +
-            " parameter must be specified as" +
-            " 'string' or 'binary'; '" + hbaseTableDefaultStorageType +
-            "' is not a valid specification for this table/serde property.");
-      }
-    }
-
-    // parse the string to determine column level storage type for primitive types
-    // 's' is for variable length string format storage
-    // 'b' is for fixed width binary storage of bytes
-    // '-' is for table storage type, which defaults to UTF8 string
-    // string data is always stored in the default escaped storage format; the data types
-    // byte, short, int, long, float, and double have a binary byte oriented storage option
-    List<TypeInfo> columnTypes = serdeParams.getColumnTypes();
-
-    for (int i = 0; i < columnsMapping.size(); i++) {
-
-      ColumnMapping colMap = columnsMapping.get(i);
-      TypeInfo colType = columnTypes.get(i);
-      String mappingSpec = colMap.mappingSpec;
-      String [] mapInfo = mappingSpec.split("#");
-      String [] storageInfo = null;
-
-      if (mapInfo.length == 2) {
-        storageInfo = mapInfo[1].split(":");
-      }
-
-      if (storageInfo == null) {
-
-        // use the table default storage specification
-        if (colType.getCategory() == Category.PRIMITIVE) {
-          if (!colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
-            colMap.binaryStorage.add(tableBinaryStorage);
-          } else {
-            colMap.binaryStorage.add(false);
-          }
-        } else if (colType.getCategory() == Category.MAP) {
-          TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
-          TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
-
-          if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
-              !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
-            colMap.binaryStorage.add(tableBinaryStorage);
-          } else {
-            colMap.binaryStorage.add(false);
-          }
-
-          if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
-              !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
-            colMap.binaryStorage.add(tableBinaryStorage);
-          } else {
-            colMap.binaryStorage.add(false);
-          }
-        } else {
-          colMap.binaryStorage.add(false);
-        }
-
-      } else if (storageInfo.length == 1) {
-        // we have a storage specification for a primitive column type
-        String storageOption = storageInfo[0];
-
-        if ((colType.getCategory() == Category.MAP) ||
-            !(storageOption.equals("-") || "string".startsWith(storageOption) ||
-                "binary".startsWith(storageOption))) {
-          throw new SerDeException("Error: A column storage specification is one of the following:"
-              + " '-', a prefix of 'string', or a prefix of 'binary'. "
-              + storageOption + " is not a valid storage option specification for "
-              + serdeParams.getColumnNames().get(i));
-        }
-
-        if (colType.getCategory() == Category.PRIMITIVE &&
-            !colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
-
-          if ("-".equals(storageOption)) {
-            colMap.binaryStorage.add(tableBinaryStorage);
-          } else if ("binary".startsWith(storageOption)) {
-            colMap.binaryStorage.add(true);
-          } else {
-              colMap.binaryStorage.add(false);
-          }
-        } else {
-          colMap.binaryStorage.add(false);
-        }
-
-      } else if (storageInfo.length == 2) {
-        // we have a storage specification for a map column type
-
-        String keyStorage = storageInfo[0];
-        String valStorage = storageInfo[1];
-
-        if ((colType.getCategory() != Category.MAP) ||
-            !(keyStorage.equals("-") || "string".startsWith(keyStorage) ||
-                "binary".startsWith(keyStorage)) ||
-            !(valStorage.equals("-") || "string".startsWith(valStorage) ||
-                "binary".startsWith(valStorage))) {
-          throw new SerDeException("Error: To specify a valid column storage type for a Map"
-              + " column, use any two specifiers from '-', a prefix of 'string', "
-              + " and a prefix of 'binary' separated by a ':'."
-              + " Valid examples are '-:-', 's:b', etc. They specify the storage type for the"
-              + " key and value parts of the Map<?,?> respectively."
-              + " Invalid storage specification for column "
-              + serdeParams.getColumnNames().get(i)
-              + "; " + storageInfo[0] + ":" + storageInfo[1]);
-        }
-
-        TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
-        TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
-
-        if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
-            !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
-
-          if (keyStorage.equals("-")) {
-            colMap.binaryStorage.add(tableBinaryStorage);
-          } else if ("binary".startsWith(keyStorage)) {
-            colMap.binaryStorage.add(true);
-          } else {
-            colMap.binaryStorage.add(false);
-          }
-        } else {
-          colMap.binaryStorage.add(false);
-        }
-
-        if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
-            !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
-          if (valStorage.equals("-")) {
-            colMap.binaryStorage.add(tableBinaryStorage);
-          } else if ("binary".startsWith(valStorage)) {
-            colMap.binaryStorage.add(true);
-          } else {
-            colMap.binaryStorage.add(false);
-          }
-        } else {
-          colMap.binaryStorage.add(false);
-        }
-
-        if (colMap.binaryStorage.size() != 2) {
-          throw new SerDeException("Error: In parsing the storage specification for column "
-              + serdeParams.getColumnNames().get(i));
-        }
-
-      } else {
-        // error in storage specification
-        throw new SerDeException("Error: " + HBASE_COLUMNS_MAPPING + " storage specification "
-            + mappingSpec + " is not valid for column: "
-            + serdeParams.getColumnNames().get(i));
-      }
-    }
-  }
-
-
-  public static boolean isRowKeyColumn(String hbaseColumnName) {
-    return hbaseColumnName.equals(HBASE_KEY_COL);
-  }
-
-
   static class ColumnMapping {
 
     ColumnMapping() {
@@ -458,97 +241,6 @@ public class HBaseSerDe extends Abstract
     byte[] qualifierPrefixBytes;
   }
 
-  private void initHBaseSerDeParameters(
-      Configuration job, Properties tbl, String serdeName)
-    throws SerDeException {
-
-    // Read configuration parameters
-    hbaseColumnsMapping = tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
-    String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
-    putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1"));
-
-    doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true"));
-
-    String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
-
-    if (compKeyClass != null) {
-      try {
-        compositeKeyClass = job.getClassByName(compKeyClass);
-      } catch (ClassNotFoundException e) {
-        throw new SerDeException(e);
-      }
-    }
-
-    // Parse and initialize the HBase columns mapping
-    columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
-
-    // Build the type property string if not supplied
-    if (columnTypeProperty == null) {
-      StringBuilder sb = new StringBuilder();
-
-      for (int i = 0; i < columnsMapping.size(); i++) {
-        if (sb.length() > 0) {
-          sb.append(":");
-        }
-
-        ColumnMapping colMap = columnsMapping.get(i);
-
-        if (colMap.hbaseRowKey) {
-          // the row key column becomes a STRING
-          sb.append(serdeConstants.STRING_TYPE_NAME);
-        } else if (colMap.qualifierName == null)  {
-          // a column family become a MAP
-          sb.append(serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + ","
-              + serdeConstants.STRING_TYPE_NAME + ">");
-        } else {
-          // an individual column becomes a STRING
-          sb.append(serdeConstants.STRING_TYPE_NAME);
-        }
-      }
-      tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, sb.toString());
-    }
-
-    serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
-
-    if (columnsMapping.size() != serdeParams.getColumnNames().size()) {
-      throw new SerDeException(serdeName + ": columns has " +
-        serdeParams.getColumnNames().size() +
-        " elements while hbase.columns.mapping has " +
-        columnsMapping.size() + " elements" +
-        " (counting the key if implicit)");
-    }
-
-    separators = serdeParams.getSeparators();
-    escaped = serdeParams.isEscaped();
-    escapeChar = serdeParams.getEscapeChar();
-    needsEscape = serdeParams.getNeedsEscape();
-
-    // check that the mapping schema is right;
-    // check that the "column-family:" is mapped to  Map<key,?>
-    // where key extends LazyPrimitive<?, ?> and thus has type Category.PRIMITIVE
-    for (int i = 0; i < columnsMapping.size(); i++) {
-      ColumnMapping colMap = columnsMapping.get(i);
-      if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
-        TypeInfo typeInfo = serdeParams.getColumnTypes().get(i);
-        if ((typeInfo.getCategory() != Category.MAP) ||
-          (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory()
-            !=  Category.PRIMITIVE)) {
-
-          throw new SerDeException(
-            serdeName + ": hbase column family '" + colMap.familyName
-            + "' should be mapped to Map<? extends LazyPrimitive<?, ?>,?>, that is "
-            + "the Key for the map should be of primitive type, but is mapped to "
-            + typeInfo.getTypeName());
-        }
-      }
-    }
-
-    // Precondition: make sure this is done after the rest of the SerDe initialization is done.
-    String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
-    parseColumnStorageTypes(hbaseTableStorageType);
-    setKeyColumnOffset();
-  }
-
   /**
    * Deserialize a row from the HBase Result writable to a LazyObject
    * @param result the HBase Result Writable containing the row
@@ -557,13 +249,12 @@ public class HBaseSerDe extends Abstract
    */
   @Override
   public Object deserialize(Writable result) throws SerDeException {
-
     if (!(result instanceof ResultWritable)) {
       throw new SerDeException(getClass().getName() + ": expects ResultWritable!");
     }
 
-    cachedHBaseRow.init(((ResultWritable) result).getResult(), columnsMapping, compositeKeyObj);
-
+    cachedHBaseRow.init(((ResultWritable) result).getResult(), serdeParams.getColumnMapping(),
+        compositeKeyObj);
     return cachedHBaseRow;
   }
 
@@ -591,34 +282,43 @@ public class HBaseSerDe extends Abstract
     List<? extends StructField> fields = soi.getAllStructFieldRefs();
     List<Object> list = soi.getStructFieldsDataAsList(obj);
     List<? extends StructField> declaredFields =
-      (serdeParams.getRowTypeInfo() != null &&
-        ((StructTypeInfo) serdeParams.getRowTypeInfo())
-        .getAllStructFieldNames().size() > 0) ?
-      ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs()
-      : null;
-
-    Put put = null;
+        ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs();
 
+    int iKey = serdeParams.getKeyIndex();
+    StructField field = fields.get(iKey);
+    Object value = list.get(iKey);
+    StructField declaredField = declaredFields.get(iKey);
+    byte[] key;
     try {
-      byte [] key = serializeField(iKey, null, fields, list, declaredFields);
+      key = serializeKeyField(field, value, declaredField, serdeParams);
+    } catch (IOException ex) {
+      throw new SerDeException(ex);
+    }
 
-      if (key == null) {
-        throw new SerDeException("HBase row key cannot be NULL");
-      }
+    if (key == null) {
+      throw new SerDeException("HBase row key cannot be NULL");
+    }
 
-      if(putTimestamp >= 0) {
-        put = new Put(key,putTimestamp);
-      } else {
-        put = new Put(key);
-      }
+    Put put = null;
+    long putTimestamp = serdeParams.getPutTimestamp();
+    if(putTimestamp >= 0) {
+      put = new Put(key, putTimestamp);
+    } else {
+      put = new Put(key);
+    }
 
+    try {
       // Serialize each field
       for (int i = 0; i < fields.size(); i++) {
         if (i == iKey) {
-          // already processed the key above
           continue;
         }
-        serializeField(i, put, fields, list, declaredFields);
+
+        field = fields.get(i);
+        value = list.get(i);
+        declaredField = declaredFields.get(i);
+        ColumnMapping colMap = serdeParams.getColumnMapping().get(i);
+        serializeField(put, field, value, declaredField, colMap);
       }
     } catch (IOException e) {
       throw new SerDeException(e);
@@ -627,99 +327,97 @@ public class HBaseSerDe extends Abstract
     return new PutWritable(put);
   }
 
-  private byte [] serializeField(
-    int i,
-    Put put,
-    List<? extends StructField> fields,
-    List<Object> list,
-    List<? extends StructField> declaredFields) throws IOException {
-
-    // column mapping info
-    ColumnMapping colMap = columnsMapping.get(i);
+  private static byte[] serializeKeyField(StructField keyField, Object keyValue,
+      StructField declaredKeyField, HBaseSerDeParameters serdeParams) throws IOException {
+	  if (keyValue == null) {
+		  // a null object, we do not serialize it
+		  return null;
+	  }
+
+    boolean writeBinary = serdeParams.getKeyColumnMapping().binaryStorage.get(0);
+	  ObjectInspector keyFieldOI = keyField.getFieldObjectInspector();
+
+	  if (!keyFieldOI.getCategory().equals(Category.PRIMITIVE) &&
+			  declaredKeyField.getFieldObjectInspector().getCategory().equals(Category.PRIMITIVE)) {
+		  // we always serialize the String type using the escaped algorithm for LazyString
+	    return serialize(SerDeUtils.getJSONString(keyValue, keyFieldOI),
+	        PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false, serdeParams);
+	  } else {
+		  // use the serialization option switch to write primitive values as either a variable
+		  // length UTF8 string or a fixed width bytes if serializing in binary format
+	    return serialize(keyValue, keyFieldOI, 1, writeBinary, serdeParams);
+	  }
 
-    // Get the field objectInspector and the field object.
-    ObjectInspector foi = fields.get(i).getFieldObjectInspector();
-    Object f = (list == null ? null : list.get(i));
+  }
 
-    if (f == null) {
+  private void serializeField(Put put, StructField field, Object value,
+    StructField declaredField, ColumnMapping colMap) throws IOException {
+    if (value == null) {
       // a null object, we do not serialize it
-      return null;
+      return;
     }
 
+    // Get the field objectInspector and the field object.
+    ObjectInspector foi = field.getFieldObjectInspector();
+
     // If the field corresponds to a column family in HBase
-    if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
+    if (colMap.qualifierName == null) {
       MapObjectInspector moi = (MapObjectInspector) foi;
       ObjectInspector koi = moi.getMapKeyObjectInspector();
       ObjectInspector voi = moi.getMapValueObjectInspector();
 
-      Map<?, ?> map = moi.getMap(f);
+      Map<?, ?> map = moi.getMap(value);
       if (map == null) {
-        return null;
+        return;
       } else {
         for (Map.Entry<?, ?> entry: map.entrySet()) {
           // Get the Key
-          serializeStream.reset();
-
           // Map keys are required to be primitive and may be serialized in binary format
-          boolean isNotNull = serialize(entry.getKey(), koi, 3, colMap.binaryStorage.get(0));
-          if (!isNotNull) {
+          byte[] columnQualifierBytes = serialize(entry.getKey(), koi, 3, colMap.binaryStorage.get(0), serdeParams);
+          if (columnQualifierBytes == null) {
             continue;
           }
 
-          // Get the column-qualifier
-          byte [] columnQualifierBytes = new byte[serializeStream.getCount()];
-          System.arraycopy(
-              serializeStream.getData(), 0, columnQualifierBytes, 0, serializeStream.getCount());
-
-          // Get the Value
-          serializeStream.reset();
-
           // Map values may be serialized in binary format when they are primitive and binary
           // serialization is the option selected
-          isNotNull = serialize(entry.getValue(), voi, 3, colMap.binaryStorage.get(1));
-          if (!isNotNull) {
+          byte[] bytes = serialize(entry.getValue(), voi, 3, colMap.binaryStorage.get(1), serdeParams);
+          if (bytes == null) {
             continue;
           }
-          byte [] value = new byte[serializeStream.getCount()];
-          System.arraycopy(serializeStream.getData(), 0, value, 0, serializeStream.getCount());
-          put.add(colMap.familyNameBytes, columnQualifierBytes, value);
+
+          put.add(colMap.familyNameBytes, columnQualifierBytes, bytes);
         }
       }
     } else {
+      byte[] bytes = null;
       // If the field that is passed in is NOT a primitive, and either the
       // field is not declared (no schema was given at initialization), or
       // the field is declared as a primitive in initialization, serialize
       // the data to JSON string.  Otherwise serialize the data in the
       // delimited way.
-      serializeStream.reset();
-      boolean isNotNull;
       if (!foi.getCategory().equals(Category.PRIMITIVE)
-          && (declaredFields == null ||
-              declaredFields.get(i).getFieldObjectInspector().getCategory()
-              .equals(Category.PRIMITIVE) || useJSONSerialize)) {
-
+          && declaredField.getFieldObjectInspector().getCategory().equals(Category.PRIMITIVE)) {
         // we always serialize the String type using the escaped algorithm for LazyString
-        isNotNull = serialize(
-            SerDeUtils.getJSONString(f, foi),
-            PrimitiveObjectInspectorFactory.javaStringObjectInspector,
-            1, false);
+        bytes = serialize(SerDeUtils.getJSONString(value, foi),
+            PrimitiveObjectInspectorFactory.javaStringObjectInspector, 1, false, serdeParams);
       } else {
         // use the serialization option switch to write primitive values as either a variable
         // length UTF8 string or a fixed width bytes if serializing in binary format
-        isNotNull = serialize(f, foi, 1, colMap.binaryStorage.get(0));
+        bytes = serialize(value, foi, 1, colMap.binaryStorage.get(0), serdeParams);
       }
-      if (!isNotNull) {
-        return null;
-      }
-      byte [] key = new byte[serializeStream.getCount()];
-      System.arraycopy(serializeStream.getData(), 0, key, 0, serializeStream.getCount());
-      if (i == iKey) {
-        return key;
+
+      if (bytes == null) {
+        return;
       }
-      put.add(colMap.familyNameBytes, colMap.qualifierNameBytes, key);
+
+      put.add(colMap.familyNameBytes, colMap.qualifierNameBytes, bytes);
     }
+  }
 
-    return null;
+  private static byte[] getBytesFromStream(ByteStream.Output ss) {
+    byte [] buf = new byte[ss.getCount()];
+    System.arraycopy(ss.getData(), 0, buf, 0, ss.getCount());
+    return buf;
   }
 
   /*
@@ -733,33 +431,35 @@ public class HBaseSerDe extends Abstract
    * @throws IOException  On error in writing to the serialization stream.
    * @return true         On serializing a non-null object, otherwise false.
    */
-  private boolean serialize(
-      Object obj,
-      ObjectInspector objInspector,
-      int level,
-      boolean writeBinary) throws IOException {
-
+  private static byte[] serialize(Object obj, ObjectInspector objInspector, int level,
+      boolean writeBinary, HBaseSerDeParameters serdeParams) throws IOException {
+    ByteStream.Output ss = new ByteStream.Output();
     if (objInspector.getCategory() == Category.PRIMITIVE && writeBinary) {
-      LazyUtils.writePrimitive(serializeStream, obj, (PrimitiveObjectInspector) objInspector);
-      return true;
+      LazyUtils.writePrimitive(ss, obj, (PrimitiveObjectInspector) objInspector);
     } else {
-      return serialize(obj, objInspector, level);
+      if (false == serialize(obj, objInspector, level, serdeParams, ss)) {
+        return null;
+      }
     }
+
+    return getBytesFromStream(ss);
   }
 
-  private boolean serialize(
+  private static boolean serialize(
       Object obj,
       ObjectInspector objInspector,
-      int level) throws IOException {
+      int level, HBaseSerDeParameters serdeParams, ByteStream.Output ss) throws IOException {
 
+    byte[] separators = serdeParams.getSeparators();
+    boolean escaped = serdeParams.isEscaped();
+    byte escapeChar = serdeParams.getEscapeChar();
+    boolean[] needsEscape = serdeParams.getNeedsEscape();
     switch (objInspector.getCategory()) {
-      case PRIMITIVE: {
-        LazyUtils.writePrimitiveUTF8(serializeStream, obj,
+      case PRIMITIVE:
+        LazyUtils.writePrimitiveUTF8(ss, obj,
             (PrimitiveObjectInspector) objInspector, escaped, escapeChar, needsEscape);
         return true;
-      }
-
-      case LIST: {
+      case LIST:
         char separator = (char) separators[level];
         ListObjectInspector loi = (ListObjectInspector)objInspector;
         List<?> list = loi.getList(obj);
@@ -769,16 +469,14 @@ public class HBaseSerDe extends Abstract
         } else {
           for (int i = 0; i < list.size(); i++) {
             if (i > 0) {
-              serializeStream.write(separator);
+              ss.write(separator);
             }
-            serialize(list.get(i), eoi, level + 1);
+            serialize(list.get(i), eoi, level + 1, serdeParams, ss);
           }
         }
         return true;
-      }
-
-      case MAP: {
-        char separator = (char) separators[level];
+      case MAP:
+        char sep = (char) separators[level];
         char keyValueSeparator = (char) separators[level+1];
         MapObjectInspector moi = (MapObjectInspector) objInspector;
         ObjectInspector koi = moi.getMapKeyObjectInspector();
@@ -793,38 +491,35 @@ public class HBaseSerDe extends Abstract
             if (first) {
               first = false;
             } else {
-              serializeStream.write(separator);
+              ss.write(sep);
             }
-            serialize(entry.getKey(), koi, level+2);
-            serializeStream.write(keyValueSeparator);
-            serialize(entry.getValue(), voi, level+2);
+            serialize(entry.getKey(), koi, level+2, serdeParams, ss);
+            ss.write(keyValueSeparator);
+            serialize(entry.getValue(), voi, level+2, serdeParams, ss);
           }
         }
         return true;
-      }
-
-      case STRUCT: {
-        char separator = (char)separators[level];
+      case STRUCT:
+        sep = (char)separators[level];
         StructObjectInspector soi = (StructObjectInspector)objInspector;
         List<? extends StructField> fields = soi.getAllStructFieldRefs();
-        List<Object> list = soi.getStructFieldsDataAsList(obj);
+        list = soi.getStructFieldsDataAsList(obj);
         if (list == null) {
           return false;
         } else {
           for (int i = 0; i < list.size(); i++) {
             if (i > 0) {
-              serializeStream.write(separator);
+              ss.write(sep);
             }
 
             serialize(list.get(i), fields.get(i).getFieldObjectInspector(),
-                level + 1);
+                level + 1, serdeParams, ss);
           }
         }
         return true;
-      }
+      default:
+        throw new RuntimeException("Unknown category type: " + objInspector.getCategory());
     }
-
-    throw new RuntimeException("Unknown category type: " + objInspector.getCategory());
   }
 
   /**
@@ -837,7 +532,7 @@ public class HBaseSerDe extends Abstract
     int i = 0;
 
     // find the hbase row key
-    for (ColumnMapping colMap : columnsMapping) {
+    for (ColumnMapping colMap : serdeParams.getColumnMapping()) {
       if (colMap.hbaseRowKey) {
         break;
       }
@@ -848,7 +543,7 @@ public class HBaseSerDe extends Abstract
         .getAllStructFieldRefs().get(i).getFieldObjectInspector();
 
     try {
-      compositeKeyObj = compositeKeyClass.getDeclaredConstructor(
+      compositeKeyObj = serdeParams.getCompositeKeyClass().getDeclaredConstructor(
             LazySimpleStructObjectInspector.class, Properties.class, Configuration.class)
             .newInstance(
                 ((LazySimpleStructObjectInspector) keyObjectInspector), tbl, conf);
@@ -865,33 +560,19 @@ public class HBaseSerDe extends Abstract
     } catch (NoSuchMethodException e) {
       // the constructor wasn't defined in the implementation class. Flag error
       throw new SerDeException("Constructor not defined in composite key class ["
-          + compositeKeyClass.getName() + "]", e);
+          + serdeParams.getCompositeKeyClass().getName() + "]", e);
     }
   }
 
   /**
-   * @return the useJSONSerialize
-   */
-  public boolean isUseJSONSerialize() {
-    return useJSONSerialize;
-  }
-
-  /**
-   * @param useJSONSerialize the useJSONSerialize to set
-   */
-  public void setUseJSONSerialize(boolean useJSONSerialize) {
-    this.useJSONSerialize = useJSONSerialize;
-  }
-
-  /**
    * @return 0-based offset of the key column within the table
    */
   int getKeyColumnOffset() {
-    return iKey;
+    return serdeParams.getKeyIndex();
   }
 
   List<Boolean> getStorageFormatOfCol(int colPos){
-    return columnsMapping.get(colPos).binaryStorage;
+    return serdeParams.getColumnMapping().get(colPos).binaryStorage;
   }
 
   @Override
@@ -900,10 +581,6 @@ public class HBaseSerDe extends Abstract
     return null;
   }
 
-  void setKeyColumnOffset() throws SerDeException {
-    iKey = getRowKeyColumnOffset(columnsMapping);
-  }
-
   public static int getRowKeyColumnOffset(List<ColumnMapping> columnsMapping)
       throws SerDeException {
 

Added: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java?rev=1579114&view=auto
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java (added)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java Wed Mar 19 01:52:11 2014
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.Text;
+
+/**
+ * HBaseSerDeParameters encapsulates SerDeParameters and additional configurations that are specific for
+ * HBaseSerDe.
+ *
+ */
+public class HBaseSerDeParameters {
+  private SerDeParameters serdeParams;
+
+  private String columnMappingString;
+  private List<ColumnMapping> columnMapping;
+  private boolean doColumnRegexMatching;
+
+  private long putTimestamp;
+
+  private Class<?> compositeKeyClass;
+  private int keyIndex;
+
+  void init(Configuration job, Properties tbl, String serdeName) throws SerDeException {
+    serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
+    putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP, "-1"));
+
+    String compKeyClass = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
+    if (compKeyClass != null) {
+      try {
+        compositeKeyClass = job.getClassByName(compKeyClass);
+      } catch (ClassNotFoundException e) {
+        throw new SerDeException(e);
+      }
+    }
+
+    // Read configuration parameters
+    columnMappingString = tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+    doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, "true"));
+    // Parse and initialize the HBase columns mapping
+    columnMapping = HBaseSerDe.parseColumnsMapping(columnMappingString, doColumnRegexMatching);
+
+    // Build the type property string if not supplied
+    String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+    if (columnTypeProperty == null) {
+      StringBuilder sb = new StringBuilder();
+
+      for (int i = 0; i < columnMapping.size(); i++) {
+        if (sb.length() > 0) {
+          sb.append(":");
+        }
+
+        ColumnMapping colMap = columnMapping.get(i);
+
+        if (colMap.hbaseRowKey) {
+          // the row key column becomes a STRING
+          sb.append(serdeConstants.STRING_TYPE_NAME);
+        } else if (colMap.qualifierName == null)  {
+          // a column family become a MAP
+          sb.append(serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + ","
+              + serdeConstants.STRING_TYPE_NAME + ">");
+        } else {
+          // an individual column becomes a STRING
+          sb.append(serdeConstants.STRING_TYPE_NAME);
+        }
+      }
+      tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, sb.toString());
+    }
+
+    if (columnMapping.size() != serdeParams.getColumnNames().size()) {
+      throw new SerDeException(serdeName + ": columns has " +
+        serdeParams.getColumnNames().size() +
+        " elements while hbase.columns.mapping has " +
+        columnMapping.size() + " elements" +
+        " (counting the key if implicit)");
+    }
+
+    // check that the mapping schema is right;
+    // check that the "column-family:" is mapped to  Map<key,?>
+    // where key extends LazyPrimitive<?, ?> and thus has type Category.PRIMITIVE
+    for (int i = 0; i < columnMapping.size(); i++) {
+      ColumnMapping colMap = columnMapping.get(i);
+      if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
+        TypeInfo typeInfo = serdeParams.getColumnTypes().get(i);
+        if ((typeInfo.getCategory() != Category.MAP) ||
+          (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory()
+            !=  Category.PRIMITIVE)) {
+
+          throw new SerDeException(
+            serdeName + ": hbase column family '" + colMap.familyName
+            + "' should be mapped to Map<? extends LazyPrimitive<?, ?>,?>, that is "
+            + "the Key for the map should be of primitive type, but is mapped to "
+            + typeInfo.getTypeName());
+        }
+      }
+    }
+
+    // Precondition: make sure this is done after the rest of the SerDe initialization is done.
+    String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
+    parseColumnStorageTypes(hbaseTableStorageType);
+    setKeyColumnOffset();
+  }
+
+  /*
+   * Utility method for parsing a string of the form '-,b,s,-,s:b,...' as a means of specifying
+   * whether to use a binary or an UTF string format to serialize and de-serialize primitive
+   * data types like boolean, byte, short, int, long, float, and double. This applies to
+   * regular columns and also to map column types which are associated with an HBase column
+   * family. For the map types, we apply the specification to the key or the value provided it
+   * is one of the above primitive types. The specifier is a colon separated value of the form
+   * -:s, or b:b where we have 's', 'b', or '-' on either side of the colon. 's' is for string
+   * format storage, 'b' is for native fixed width byte oriented storage, and '-' uses the
+   * table level default.
+   *
+   * @param hbaseTableDefaultStorageType - the specification associated with the table property
+   *        hbase.table.default.storage.type
+   * @throws SerDeException on parse error.
+   */
+
+  public void parseColumnStorageTypes(String hbaseTableDefaultStorageType)
+      throws SerDeException {
+
+    boolean tableBinaryStorage = false;
+
+    if (hbaseTableDefaultStorageType != null && !"".equals(hbaseTableDefaultStorageType)) {
+      if (hbaseTableDefaultStorageType.equals("binary")) {
+        tableBinaryStorage = true;
+      } else if (!hbaseTableDefaultStorageType.equals("string")) {
+        throw new SerDeException("Error: " + HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE +
+            " parameter must be specified as" +
+            " 'string' or 'binary'; '" + hbaseTableDefaultStorageType +
+            "' is not a valid specification for this table/serde property.");
+      }
+    }
+
+    // parse the string to determine column level storage type for primitive types
+    // 's' is for variable length string format storage
+    // 'b' is for fixed width binary storage of bytes
+    // '-' is for table storage type, which defaults to UTF8 string
+    // string data is always stored in the default escaped storage format; the data types
+    // byte, short, int, long, float, and double have a binary byte oriented storage option
+    List<TypeInfo> columnTypes = serdeParams.getColumnTypes();
+
+    for (int i = 0; i < columnMapping.size(); i++) {
+
+      ColumnMapping colMap = columnMapping.get(i);
+      TypeInfo colType = columnTypes.get(i);
+      String mappingSpec = colMap.mappingSpec;
+      String [] mapInfo = mappingSpec.split("#");
+      String [] storageInfo = null;
+
+      if (mapInfo.length == 2) {
+        storageInfo = mapInfo[1].split(":");
+      }
+
+      if (storageInfo == null) {
+
+        // use the table default storage specification
+        if (colType.getCategory() == Category.PRIMITIVE) {
+          if (!colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+            colMap.binaryStorage.add(tableBinaryStorage);
+          } else {
+            colMap.binaryStorage.add(false);
+          }
+        } else if (colType.getCategory() == Category.MAP) {
+          TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
+          TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
+
+          if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
+              !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+            colMap.binaryStorage.add(tableBinaryStorage);
+          } else {
+            colMap.binaryStorage.add(false);
+          }
+
+          if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
+              !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+            colMap.binaryStorage.add(tableBinaryStorage);
+          } else {
+            colMap.binaryStorage.add(false);
+          }
+        } else {
+          colMap.binaryStorage.add(false);
+        }
+
+      } else if (storageInfo.length == 1) {
+        // we have a storage specification for a primitive column type
+        String storageOption = storageInfo[0];
+
+        if ((colType.getCategory() == Category.MAP) ||
+            !(storageOption.equals("-") || "string".startsWith(storageOption) ||
+                "binary".startsWith(storageOption))) {
+          throw new SerDeException("Error: A column storage specification is one of the following:"
+              + " '-', a prefix of 'string', or a prefix of 'binary'. "
+              + storageOption + " is not a valid storage option specification for "
+              + serdeParams.getColumnNames().get(i));
+        }
+
+        if (colType.getCategory() == Category.PRIMITIVE &&
+            !colType.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+
+          if ("-".equals(storageOption)) {
+            colMap.binaryStorage.add(tableBinaryStorage);
+          } else if ("binary".startsWith(storageOption)) {
+            colMap.binaryStorage.add(true);
+          } else {
+              colMap.binaryStorage.add(false);
+          }
+        } else {
+          colMap.binaryStorage.add(false);
+        }
+
+      } else if (storageInfo.length == 2) {
+        // we have a storage specification for a map column type
+
+        String keyStorage = storageInfo[0];
+        String valStorage = storageInfo[1];
+
+        if ((colType.getCategory() != Category.MAP) ||
+            !(keyStorage.equals("-") || "string".startsWith(keyStorage) ||
+                "binary".startsWith(keyStorage)) ||
+            !(valStorage.equals("-") || "string".startsWith(valStorage) ||
+                "binary".startsWith(valStorage))) {
+          throw new SerDeException("Error: To specify a valid column storage type for a Map"
+              + " column, use any two specifiers from '-', a prefix of 'string', "
+              + " and a prefix of 'binary' separated by a ':'."
+              + " Valid examples are '-:-', 's:b', etc. They specify the storage type for the"
+              + " key and value parts of the Map<?,?> respectively."
+              + " Invalid storage specification for column "
+              + serdeParams.getColumnNames().get(i)
+              + "; " + storageInfo[0] + ":" + storageInfo[1]);
+        }
+
+        TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
+        TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
+
+        if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
+            !keyTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+
+          if (keyStorage.equals("-")) {
+            colMap.binaryStorage.add(tableBinaryStorage);
+          } else if ("binary".startsWith(keyStorage)) {
+            colMap.binaryStorage.add(true);
+          } else {
+            colMap.binaryStorage.add(false);
+          }
+        } else {
+          colMap.binaryStorage.add(false);
+        }
+
+        if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
+            !valueTypeInfo.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+          if (valStorage.equals("-")) {
+            colMap.binaryStorage.add(tableBinaryStorage);
+          } else if ("binary".startsWith(valStorage)) {
+            colMap.binaryStorage.add(true);
+          } else {
+            colMap.binaryStorage.add(false);
+          }
+        } else {
+          colMap.binaryStorage.add(false);
+        }
+
+        if (colMap.binaryStorage.size() != 2) {
+          throw new SerDeException("Error: In parsing the storage specification for column "
+              + serdeParams.getColumnNames().get(i));
+        }
+
+      } else {
+        // error in storage specification
+        throw new SerDeException("Error: " + HBaseSerDe.HBASE_COLUMNS_MAPPING + " storage specification "
+            + mappingSpec + " is not valid for column: "
+            + serdeParams.getColumnNames().get(i));
+      }
+    }
+  }
+
+  void setKeyColumnOffset() throws SerDeException {
+    setKeyIndex(getRowKeyColumnOffset(columnMapping));
+  }
+
+  public static int getRowKeyColumnOffset(List<ColumnMapping> columnsMapping)
+      throws SerDeException {
+
+    for (int i = 0; i < columnsMapping.size(); i++) {
+      ColumnMapping colMap = columnsMapping.get(i);
+
+      if (colMap.hbaseRowKey && colMap.familyName.equals(HBaseSerDe.HBASE_KEY_COL)) {
+        return i;
+      }
+    }
+
+    throw new SerDeException("HBaseSerDe Error: columns mapping list does not contain" +
+      " row key column.");
+  }
+
+  public StructTypeInfo getRowTypeInfo() {
+    return (StructTypeInfo) serdeParams.getRowTypeInfo();
+  }
+
+  public List<String> getColumnNames() {
+    return serdeParams.getColumnNames();
+  }
+
+  public byte[] getSeparators() {
+    return serdeParams.getSeparators();
+  }
+
+  public Text getNullSequence() {
+    return serdeParams.getNullSequence();
+  }
+
+  public boolean isLastColumnTakesRest() {
+    return serdeParams.isLastColumnTakesRest();
+  }
+
+  public boolean isEscaped() {
+    return serdeParams.isEscaped();
+  }
+
+  public byte getEscapeChar() {
+    return serdeParams.getEscapeChar();
+  }
+
+  public List<TypeInfo> getColumnTypes() {
+    return serdeParams.getColumnTypes();
+  }
+
+  public SerDeParameters getSerdeParams() {
+    return serdeParams;
+  }
+
+  public void setSerdeParams(SerDeParameters serdeParams) {
+    this.serdeParams = serdeParams;
+  }
+
+  public String getColumnMappingString() {
+    return columnMappingString;
+  }
+
+  public void setColumnMappingString(String columnMappingString) {
+    this.columnMappingString = columnMappingString;
+  }
+
+  public long getPutTimestamp() {
+    return putTimestamp;
+  }
+
+  public void setPutTimestamp(long putTimestamp) {
+    this.putTimestamp = putTimestamp;
+  }
+
+  public boolean isDoColumnRegexMatching() {
+    return doColumnRegexMatching;
+  }
+
+  public void setDoColumnRegexMatching(boolean doColumnRegexMatching) {
+    this.doColumnRegexMatching = doColumnRegexMatching;
+  }
+
+  public Class<?> getCompositeKeyClass() {
+    return compositeKeyClass;
+  }
+
+  public void setCompositeKeyClass(Class<?> compositeKeyClass) {
+    this.compositeKeyClass = compositeKeyClass;
+  }
+
+  public int getKeyIndex() {
+    return keyIndex;
+  }
+
+  public void setKeyIndex(int keyIndex) {
+    this.keyIndex = keyIndex;
+  }
+
+  public List<ColumnMapping> getColumnMapping() {
+    return columnMapping;
+  }
+
+  public ColumnMapping getKeyColumnMapping() {
+      return columnMapping.get(keyIndex);
+    }
+
+  public boolean[] getNeedsEscape() {
+    return serdeParams.getNeedsEscape();
+  }
+}

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1579114&r1=1579113&r2=1579114&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Wed Mar 19 01:52:11 2014
@@ -155,7 +155,7 @@ public class HBaseStorageHandler extends
       Map<String, String> serdeParam = tbl.getSd().getSerdeInfo().getParameters();
       String hbaseColumnsMapping = serdeParam.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
 
-      List<ColumnMapping> columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+      List<ColumnMapping> columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, true);
 
       HTableDescriptor tableDesc;
 

Modified: hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java?rev=1579114&r1=1579113&r2=1579114&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java (original)
+++ hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java Wed Mar 19 01:52:11 2014
@@ -466,7 +466,7 @@ public class TestLazyHBaseObject extends
     List<ColumnMapping> columnsMapping = null;
 
     try {
-      columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColsMapping);
+      columnsMapping = parseColumnsMapping(hbaseColsMapping);
     } catch (SerDeException e) {
       fail(e.toString());
     }
@@ -591,7 +591,7 @@ public class TestLazyHBaseObject extends
     String hbaseColsMapping = ":key,cfa:a,cfa:b,cfb:,cfc:d";
 
     try {
-      columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColsMapping);
+      columnsMapping = parseColumnsMapping(hbaseColsMapping);
     } catch (SerDeException e) {
       fail(e.toString());
     }
@@ -716,7 +716,7 @@ public class TestLazyHBaseObject extends
     List<ColumnMapping> columnsMapping = null;
 
     try {
-      columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+      columnsMapping = parseColumnsMapping(hbaseColumnsMapping);
     } catch (SerDeException sde) {
       fail(sde.toString());
     }
@@ -850,4 +850,19 @@ public class TestLazyHBaseObject extends
       }
     }
   }
+
+  /**
+   * Parses the HBase columns mapping specifier to identify the column families, qualifiers
+   * and also caches the byte arrays corresponding to them. One of the Hive table
+   * columns maps to the HBase row key, by default the first column.
+   *
+   * @param columnsMappingSpec string hbase.columns.mapping specified when creating table
+   * @return List<ColumnMapping> which contains the column mapping information by position
+   * @throws SerDeException
+   */
+  public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec)
+      throws SerDeException {
+    return HBaseSerDe.parseColumnsMapping(columnsMappingSpec, true);
+  }
+
 }