You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2012/03/09 01:47:38 UTC

svn commit: r1298673 [1/4] - in /hive/trunk: hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hbase-handler/src/test/org/apache/hadoop/hive/hbase/ hbase-handler/src/test/queries/ hbase-handler/src/test/results/ serde/src/java/org/apache/hadoop/hive...

Author: hashutosh
Date: Fri Mar  9 00:47:37 2012
New Revision: 1298673

URL: http://svn.apache.org/viewvc?rev=1298673&view=rev
Log:
HIVE-1634: Allow access to Primitive types stored in binary format in HBase (Basab Maulik, Ashutosh Chauhan via hashutosh)

Added:
    hive/trunk/hbase-handler/src/test/queries/hbase_binary_external_table_queries.q
    hive/trunk/hbase-handler/src/test/queries/hbase_binary_map_queries.q
    hive/trunk/hbase-handler/src/test/queries/hbase_binary_storage_queries.q
    hive/trunk/hbase-handler/src/test/results/hbase_binary_external_table_queries.q.out
    hive/trunk/hbase-handler/src/test/results/hbase_binary_map_queries.q.out
    hive/trunk/hbase-handler/src/test/results/hbase_binary_storage_queries.q.out
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioBoolean.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioByte.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioDouble.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioFloat.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioInteger.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioLong.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazydio/LazyDioShort.java
Modified:
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsPublisher.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
    hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
    hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
    hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyFactory.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyPrimitive.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Fri Mar  9 00:47:37 2012
@@ -62,16 +62,14 @@ public class HBaseSerDe implements SerDe
 
   public static final String HBASE_COLUMNS_MAPPING = "hbase.columns.mapping";
   public static final String HBASE_TABLE_NAME = "hbase.table.name";
+  public static final String HBASE_TABLE_DEFAULT_STORAGE_TYPE = "hbase.table.default.storage.type";
   public static final String HBASE_KEY_COL = ":key";
   public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp";
   public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
 
   private ObjectInspector cachedObjectInspector;
   private String hbaseColumnsMapping;
-  private List<String> hbaseColumnFamilies;
-  private List<byte []> hbaseColumnFamiliesBytes;
-  private List<String> hbaseColumnQualifiers;
-  private List<byte []> hbaseColumnQualifiersBytes;
+  private List<ColumnMapping> columnsMapping;
   private SerDeParameters serdeParams;
   private boolean useJSONSerialize;
   private LazyHBaseRow cachedHBaseRow;
@@ -110,8 +108,7 @@ public class HBaseSerDe implements SerDe
   public void initialize(Configuration conf, Properties tbl)
       throws SerDeException {
 
-    initHBaseSerDeParameters(conf, tbl,
-        getClass().getName());
+    initHBaseSerDeParameters(conf, tbl, getClass().getName());
 
     cachedObjectInspector = LazyFactory.createLazyStructInspector(
         serdeParams.getColumnNames(),
@@ -136,118 +133,283 @@ public class HBaseSerDe implements SerDe
   }
 
   /**
-   * Parses the HBase columns mapping to identify the column families, qualifiers
+   * Parses the HBase columns mapping specifier to identify the column families, qualifiers
    * and also caches the byte arrays corresponding to them. One of the Hive table
    * columns maps to the HBase row key, by default the first column.
    *
-   * @param columnMapping - the column mapping specification to be parsed
-   * @param colFamilies - the list of HBase column family names
-   * @param colFamiliesBytes - the corresponding byte array
-   * @param colQualifiers - the list of HBase column qualifier names
-   * @param colQualifiersBytes - the corresponding byte array
-   * @return the row key index in the column names list
+   * @param columnsMappingSpec string hbase.columns.mapping specified when creating table
+   * @return List<ColumnMapping> which contains the column mapping information by position
    * @throws SerDeException
    */
-  public static int parseColumnMapping(
-      String columnMapping,
-      List<String> colFamilies,
-      List<byte []> colFamiliesBytes,
-      List<String> colQualifiers,
-      List<byte []> colQualifiersBytes) throws SerDeException {
-
-    int rowKeyIndex = -1;
-
-    if (colFamilies == null || colQualifiers == null) {
-      throw new SerDeException("Error: caller must pass in lists for the column families " +
-          "and qualifiers.");
-    }
-
-    colFamilies.clear();
-    colQualifiers.clear();
+  public static List<ColumnMapping> parseColumnsMapping(String columnsMappingSpec)
+      throws SerDeException {
 
-    if (columnMapping == null) {
+    if (columnsMappingSpec == null) {
       throw new SerDeException("Error: hbase.columns.mapping missing for this HBase table.");
     }
 
-    if (columnMapping.equals("") || columnMapping.equals(HBASE_KEY_COL)) {
+    if (columnsMappingSpec.equals("") || columnsMappingSpec.equals(HBASE_KEY_COL)) {
       throw new SerDeException("Error: hbase.columns.mapping specifies only the HBase table"
           + " row key. A valid Hive-HBase table must specify at least one additional column.");
     }
 
-    String [] mapping = columnMapping.split(",");
+    int rowKeyIndex = -1;
+    List<ColumnMapping> columnsMapping = new ArrayList<ColumnMapping>();
+    String [] columnSpecs = columnsMappingSpec.split(",");
+    ColumnMapping columnMapping = null;
+
+    for (int i = 0; i < columnSpecs.length; i++) {
+      String mappingSpec = columnSpecs[i];
+      String [] mapInfo = mappingSpec.split("#");
+      String colInfo = mapInfo[0];
 
-    for (int i = 0; i < mapping.length; i++) {
-      String elem = mapping[i];
-      int idxFirst = elem.indexOf(":");
-      int idxLast = elem.lastIndexOf(":");
+      int idxFirst = colInfo.indexOf(":");
+      int idxLast = colInfo.lastIndexOf(":");
 
       if (idxFirst < 0 || !(idxFirst == idxLast)) {
         throw new SerDeException("Error: the HBase columns mapping contains a badly formed " +
             "column family, column qualifier specification.");
       }
 
-      if (elem.equals(HBASE_KEY_COL)) {
+      columnMapping = new ColumnMapping();
+
+      if (colInfo.equals(HBASE_KEY_COL)) {
         rowKeyIndex = i;
-        colFamilies.add(elem);
-        colQualifiers.add(null);
+        columnMapping.familyName = colInfo;
+        columnMapping.familyNameBytes = Bytes.toBytes(colInfo);
+        columnMapping.qualifierName = null;
+        columnMapping.qualifierNameBytes = null;
+        columnMapping.hbaseRowKey = true;
       } else {
-        String [] parts = elem.split(":");
+        String [] parts = colInfo.split(":");
         assert(parts.length > 0 && parts.length <= 2);
-        colFamilies.add(parts[0]);
+        columnMapping.familyName = parts[0];
+        columnMapping.familyNameBytes = Bytes.toBytes(parts[0]);
+        columnMapping.hbaseRowKey = false;
 
         if (parts.length == 2) {
-          colQualifiers.add(parts[1]);
+          columnMapping.qualifierName = parts[1];
+          columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]);
         } else {
-          colQualifiers.add(null);
+          columnMapping.qualifierName = null;
+          columnMapping.qualifierNameBytes = null;
         }
       }
+
+      columnMapping.mappingSpec = mappingSpec;
+
+      columnsMapping.add(columnMapping);
     }
 
     if (rowKeyIndex == -1) {
-      colFamilies.add(0, HBASE_KEY_COL);
-      colQualifiers.add(0, null);
-      rowKeyIndex = 0;
-    }
+      columnMapping = new ColumnMapping();
+      columnMapping.familyName = HBASE_KEY_COL;
+      columnMapping.familyNameBytes = Bytes.toBytes(HBASE_KEY_COL);
+      columnMapping.qualifierName = null;
+      columnMapping.qualifierNameBytes = null;
+      columnMapping.hbaseRowKey = true;
+      columnMapping.mappingSpec = HBASE_KEY_COL;
+      columnsMapping.add(0, columnMapping);
+    }
+
+    return columnsMapping;
+  }
+
+  /*
+   * Utility method for parsing a string of the form '-,b,s,-,s:b,...' as a means of specifying
+   * whether to use a binary or an UTF string format to serialize and de-serialize primitive
+   * data types like boolean, byte, short, int, long, float, and double. This applies to
+   * regular columns and also to map column types which are associated with an HBase column
+   * family. For the map types, we apply the specification to the key or the value provided it
+   * is one of the above primitive types. The specifier is a colon separated value of the form
+   * -:s, or b:b where we have 's', 'b', or '-' on either side of the colon. 's' is for string
+   * format storage, 'b' is for native fixed width byte oriented storage, and '-' uses the
+   * table level default.
+   *
+   * @param hbaseTableDefaultStorageType - the specification associated with the table property
+   *        hbase.table.default.storage.type
+   * @throws SerDeException on parse error.
+   */
 
-    if (colFamilies.size() != colQualifiers.size()) {
-      throw new SerDeException("Error in parsing the hbase columns mapping.");
-    }
+  private void parseColumnStorageTypes(String hbaseTableDefaultStorageType)
+      throws SerDeException {
 
-    // populate the corresponding byte [] if the client has passed in a non-null list
-    if (colFamiliesBytes != null) {
-      colFamiliesBytes.clear();
+    boolean tableBinaryStorage = false;
 
-      for (String fam : colFamilies) {
-        colFamiliesBytes.add(Bytes.toBytes(fam));
+    if (hbaseTableDefaultStorageType != null && !"".equals(hbaseTableDefaultStorageType)) {
+      if (hbaseTableDefaultStorageType.equals("binary")) {
+        tableBinaryStorage = true;
+      } else if (!hbaseTableDefaultStorageType.equals("string")) {
+        throw new SerDeException("Error: " + HBASE_TABLE_DEFAULT_STORAGE_TYPE +
+            " parameter must be specified as" +
+            " 'string' or 'binary'; '" + hbaseTableDefaultStorageType +
+            "' is not a valid specification for this table/serde property.");
       }
     }
 
-    if (colQualifiersBytes != null) {
-      colQualifiersBytes.clear();
+    // parse the string to determine column level storage type for primitive types
+    // 's' is for variable length string format storage
+    // 'b' is for fixed width binary storage of bytes
+    // '-' is for table storage type, which defaults to UTF8 string
+    // string data is always stored in the default escaped storage format; the data types
+    // byte, short, int, long, float, and double have a binary byte oriented storage option
+    List<TypeInfo> columnTypes = serdeParams.getColumnTypes();
+
+    for (int i = 0; i < columnsMapping.size(); i++) {
+
+      ColumnMapping colMap = columnsMapping.get(i);
+      TypeInfo colType = columnTypes.get(i);
+      String mappingSpec = colMap.mappingSpec;
+      String [] mapInfo = mappingSpec.split("#");
+      String [] storageInfo = null;
+
+      if (mapInfo.length == 2) {
+        storageInfo = mapInfo[1].split(":");
+      }
+
+      if (storageInfo == null) {
+
+        // use the table default storage specification
+        if (colType.getCategory() == Category.PRIMITIVE) {
+          if (!colType.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
+            colMap.binaryStorage.add(tableBinaryStorage);
+          } else {
+            colMap.binaryStorage.add(false);
+          }
+        } else if (colType.getCategory() == Category.MAP) {
+          TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
+          TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
+
+          if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
+              !keyTypeInfo.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
+            colMap.binaryStorage.add(tableBinaryStorage);
+          } else {
+            colMap.binaryStorage.add(false);
+          }
 
-      for (String qual : colQualifiers) {
-        if (qual == null) {
-          colQualifiersBytes.add(null);
+          if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
+              !valueTypeInfo.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
+            colMap.binaryStorage.add(tableBinaryStorage);
+          } else {
+            colMap.binaryStorage.add(false);
+          }
         } else {
-          colQualifiersBytes.add(Bytes.toBytes(qual));
+          colMap.binaryStorage.add(false);
+        }
+
+      } else if (storageInfo.length == 1) {
+        // we have a storage specification for a primitive column type
+        String storageOption = storageInfo[0];
+
+        if ((colType.getCategory() == Category.MAP) ||
+            !(storageOption.equals("-") || "string".startsWith(storageOption) ||
+                "binary".startsWith(storageOption))) {
+          throw new SerDeException("Error: A column storage specification is one of the following:"
+              + " '-', a prefix of 'string', or a prefix of 'binary'. "
+              + storageOption + " is not a valid storage option specification for "
+              + serdeParams.getColumnNames().get(i));
+        }
+
+        if (colType.getCategory() == Category.PRIMITIVE &&
+            !colType.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
+
+          if ("-".equals(storageOption)) {
+            colMap.binaryStorage.add(tableBinaryStorage);
+          } else if ("binary".startsWith(storageOption)) {
+            colMap.binaryStorage.add(true);
+          } else {
+              colMap.binaryStorage.add(false);
+          }
+        } else {
+          colMap.binaryStorage.add(false);
+        }
+
+      } else if (storageInfo.length == 2) {
+        // we have a storage specification for a map column type
+
+        String keyStorage = storageInfo[0];
+        String valStorage = storageInfo[1];
+
+        if ((colType.getCategory() != Category.MAP) ||
+            !(keyStorage.equals("-") || "string".startsWith(keyStorage) ||
+                "binary".startsWith(keyStorage)) ||
+            !(valStorage.equals("-") || "string".startsWith(valStorage) ||
+                "binary".startsWith(valStorage))) {
+          throw new SerDeException("Error: To specify a valid column storage type for a Map"
+              + " column, use any two specifiers from '-', a prefix of 'string', "
+              + " and a prefix of 'binary' separated by a ':'."
+              + " Valid examples are '-:-', 's:b', etc. They specify the storage type for the"
+              + " key and value parts of the Map<?,?> respectively."
+              + " Invalid storage specification for column "
+              + serdeParams.getColumnNames().get(i)
+              + "; " + storageInfo[0] + ":" + storageInfo[1]);
+        }
+
+        TypeInfo keyTypeInfo = ((MapTypeInfo) colType).getMapKeyTypeInfo();
+        TypeInfo valueTypeInfo = ((MapTypeInfo) colType).getMapValueTypeInfo();
+
+        if (keyTypeInfo.getCategory() == Category.PRIMITIVE &&
+            !keyTypeInfo.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
+
+          if (keyStorage.equals("-")) {
+            colMap.binaryStorage.add(tableBinaryStorage);
+          } else if ("binary".startsWith(keyStorage)) {
+            colMap.binaryStorage.add(true);
+          } else {
+            colMap.binaryStorage.add(false);
+          }
+        } else {
+          colMap.binaryStorage.add(false);
+        }
+
+        if (valueTypeInfo.getCategory() == Category.PRIMITIVE &&
+            !valueTypeInfo.getTypeName().equals(Constants.STRING_TYPE_NAME)) {
+          if (valStorage.equals("-")) {
+            colMap.binaryStorage.add(tableBinaryStorage);
+          } else if ("binary".startsWith(valStorage)) {
+            colMap.binaryStorage.add(true);
+          } else {
+            colMap.binaryStorage.add(false);
+          }
+        } else {
+          colMap.binaryStorage.add(false);
+        }
+
+        if (colMap.binaryStorage.size() != 2) {
+          throw new SerDeException("Error: In parsing the storage specification for column "
+              + serdeParams.getColumnNames().get(i));
         }
-      }
-    }
 
-    if (colFamiliesBytes != null && colQualifiersBytes != null) {
-      if (colFamiliesBytes.size() != colQualifiersBytes.size()) {
-        throw new SerDeException("Error in caching the bytes for the hbase column families " +
-            "and qualifiers.");
+      } else {
+        // error in storage specification
+        throw new SerDeException("Error: " + HBASE_COLUMNS_MAPPING + " storage specification "
+            + mappingSpec + " is not valid for column: "
+            + serdeParams.getColumnNames().get(i));
       }
     }
-
-    return rowKeyIndex;
   }
 
-  public static boolean isSpecialColumn(String hbaseColumnName) {
+
+  public static boolean isRowKeyColumn(String hbaseColumnName) {
     return hbaseColumnName.equals(HBASE_KEY_COL);
   }
 
+
+  static class ColumnMapping {
+
+    ColumnMapping() {
+      binaryStorage = new ArrayList<Boolean>(2);
+    }
+
+    String familyName;
+    String qualifierName;
+    byte [] familyNameBytes;
+    byte [] qualifierNameBytes;
+    List<Boolean> binaryStorage;
+    boolean hbaseRowKey;
+    String mappingSpec;
+  }
+
   private void initHBaseSerDeParameters(
       Configuration job, Properties tbl, String serdeName)
     throws SerDeException {
@@ -257,33 +419,27 @@ public class HBaseSerDe implements SerDe
     String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
     putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1"));
 
-    // Parse the HBase columns mapping and initialize the col family & qualifiers
-    hbaseColumnFamilies = new ArrayList<String>();
-    hbaseColumnFamiliesBytes = new ArrayList<byte []>();
-    hbaseColumnQualifiers = new ArrayList<String>();
-    hbaseColumnQualifiersBytes = new ArrayList<byte []>();
-    iKey = parseColumnMapping(hbaseColumnsMapping, hbaseColumnFamilies,
-        hbaseColumnFamiliesBytes, hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
+    // Parse and initialize the HBase columns mapping
+    columnsMapping = parseColumnsMapping(hbaseColumnsMapping);
 
     // Build the type property string if not supplied
     if (columnTypeProperty == null) {
       StringBuilder sb = new StringBuilder();
 
-      for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
+      for (int i = 0; i < columnsMapping.size(); i++) {
         if (sb.length() > 0) {
           sb.append(":");
         }
-        String colFamily = hbaseColumnFamilies.get(i);
-        String colQualifier = hbaseColumnQualifiers.get(i);
-        if (isSpecialColumn(colFamily)) {
-            // the row key column becomes a STRING
-            sb.append(Constants.STRING_TYPE_NAME);
-        } else if (colQualifier == null)  {
+
+        ColumnMapping colMap = columnsMapping.get(i);
+
+        if (colMap.hbaseRowKey) {
+          // the row key column becomes a STRING
+          sb.append(Constants.STRING_TYPE_NAME);
+        } else if (colMap.qualifierName == null)  {
           // a column family become a MAP
-          sb.append(
-            Constants.MAP_TYPE_NAME + "<"
-            + Constants.STRING_TYPE_NAME
-            + "," + Constants.STRING_TYPE_NAME + ">");
+          sb.append(Constants.MAP_TYPE_NAME + "<" + Constants.STRING_TYPE_NAME + ","
+              + Constants.STRING_TYPE_NAME + ">");
         } else {
           // an individual column becomes a STRING
           sb.append(Constants.STRING_TYPE_NAME);
@@ -294,11 +450,11 @@ public class HBaseSerDe implements SerDe
 
     serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
 
-    if (hbaseColumnFamilies.size() != serdeParams.getColumnNames().size()) {
+    if (columnsMapping.size() != serdeParams.getColumnNames().size()) {
       throw new SerDeException(serdeName + ": columns has " +
         serdeParams.getColumnNames().size() +
         " elements while hbase.columns.mapping has " +
-        hbaseColumnFamilies.size() + " elements" +
+        columnsMapping.size() + " elements" +
         " (counting the key if implicit)");
     }
 
@@ -308,24 +464,29 @@ public class HBaseSerDe implements SerDe
     needsEscape = serdeParams.getNeedsEscape();
 
     // check that the mapping schema is right;
-    // check that the "column-family:" is mapped to MAP<String,?>
-    for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
-      String colFamily = hbaseColumnFamilies.get(i);
-      String colQualifier = hbaseColumnQualifiers.get(i);
-      if (colQualifier == null && !isSpecialColumn(colFamily)) {
+    // check that the "column-family:" is mapped to  Map<key,?>
+    // where key extends LazyPrimitive<?, ?> and thus has type Category.PRIMITIVE
+    for (int i = 0; i < columnsMapping.size(); i++) {
+      ColumnMapping colMap = columnsMapping.get(i);
+      if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
         TypeInfo typeInfo = serdeParams.getColumnTypes().get(i);
         if ((typeInfo.getCategory() != Category.MAP) ||
-          (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getTypeName()
-            !=  Constants.STRING_TYPE_NAME)) {
+          (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory()
+            !=  Category.PRIMITIVE)) {
 
           throw new SerDeException(
-            serdeName + ": hbase column family '"
-            + colFamily
-            + "' should be mapped to Map<String,?> but is mapped to "
+            serdeName + ": hbase column family '" + colMap.familyName
+            + "' should be mapped to Map<? extends LazyPrimitive<?, ?>,?>, that is "
+            + "the Key for the map should be of primitive type, but is mapped to "
             + typeInfo.getTypeName());
         }
       }
     }
+
+    // Precondition: make sure this is done after the rest of the SerDe initialization is done.
+    String hbaseTableStorageType = tbl.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE);
+    parseColumnStorageTypes(hbaseTableStorageType);
+    setKeyColumnOffset();
   }
 
   /**
@@ -341,8 +502,7 @@ public class HBaseSerDe implements SerDe
       throw new SerDeException(getClass().getName() + ": expects Result!");
     }
 
-    cachedHBaseRow.init((Result) result, hbaseColumnFamilies, hbaseColumnFamiliesBytes,
-        hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
+    cachedHBaseRow.init((Result) result, columnsMapping);
 
     return cachedHBaseRow;
   }
@@ -413,9 +573,8 @@ public class HBaseSerDe implements SerDe
     List<Object> list,
     List<? extends StructField> declaredFields) throws IOException {
 
-    // column name
-    String hbaseColumnFamily = hbaseColumnFamilies.get(i);
-    String hbaseColumnQualifier = hbaseColumnQualifiers.get(i);
+    // column mapping info
+    ColumnMapping colMap = columnsMapping.get(i);
 
     // Get the field objectInspector and the field object.
     ObjectInspector foi = fields.get(i).getFieldObjectInspector();
@@ -427,8 +586,8 @@ public class HBaseSerDe implements SerDe
     }
 
     // If the field corresponds to a column family in HBase
-    if (hbaseColumnQualifier == null && !isSpecialColumn(hbaseColumnFamily)) {
-      MapObjectInspector moi = (MapObjectInspector)foi;
+    if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
+      MapObjectInspector moi = (MapObjectInspector) foi;
       ObjectInspector koi = moi.getMapKeyObjectInspector();
       ObjectInspector voi = moi.getMapValueObjectInspector();
 
@@ -439,7 +598,12 @@ public class HBaseSerDe implements SerDe
         for (Map.Entry<?, ?> entry: map.entrySet()) {
           // Get the Key
           serializeStream.reset();
-          serialize(entry.getKey(), koi, 3);
+
+          // Map keys are required to be primitive and may be serialized in binary format
+          boolean isNotNull = serialize(entry.getKey(), koi, 3, colMap.binaryStorage.get(0));
+          if (!isNotNull) {
+            continue;
+          }
 
           // Get the column-qualifier
           byte [] columnQualifierBytes = new byte[serializeStream.getCount()];
@@ -448,13 +612,16 @@ public class HBaseSerDe implements SerDe
 
           // Get the Value
           serializeStream.reset();
-          boolean isNotNull = serialize(entry.getValue(), voi, 3);
+
+          // Map values may be serialized in binary format when they are primitive and binary
+          // serialization is the option selected
+          isNotNull = serialize(entry.getValue(), voi, 3, colMap.binaryStorage.get(1));
           if (!isNotNull) {
             continue;
           }
           byte [] value = new byte[serializeStream.getCount()];
           System.arraycopy(serializeStream.getData(), 0, value, 0, serializeStream.getCount());
-          put.add(hbaseColumnFamiliesBytes.get(i), columnQualifierBytes, value);
+          put.add(colMap.familyNameBytes, columnQualifierBytes, value);
         }
       }
     } else {
@@ -470,12 +637,15 @@ public class HBaseSerDe implements SerDe
               declaredFields.get(i).getFieldObjectInspector().getCategory()
               .equals(Category.PRIMITIVE) || useJSONSerialize)) {
 
+        // we always serialize the String type using the escaped algorithm for LazyString
         isNotNull = serialize(
             SerDeUtils.getJSONString(f, foi),
             PrimitiveObjectInspectorFactory.javaStringObjectInspector,
-            1);
+            1, false);
       } else {
-        isNotNull = serialize(f, foi, 1);
+        // use the serialization option switch to write primitive values as either a variable
+        // length UTF8 string or a fixed width bytes if serializing in binary format
+        isNotNull = serialize(f, foi, 1, colMap.binaryStorage.get(0));
       }
       if (!isNotNull) {
         return null;
@@ -485,32 +655,49 @@ public class HBaseSerDe implements SerDe
       if (i == iKey) {
         return key;
       }
-      put.add(hbaseColumnFamiliesBytes.get(i), hbaseColumnQualifiersBytes.get(i), key);
+      put.add(colMap.familyNameBytes, colMap.qualifierNameBytes, key);
     }
 
     return null;
   }
 
-  /**
+  /*
    * Serialize the row into a ByteStream.
    *
    * @param obj           The object for the current field.
    * @param objInspector  The ObjectInspector for the current Object.
    * @param level         The current level of separator.
-   * @throws IOException
-   * @return true, if serialize is a not-null object; otherwise false.
+   * @param writeBinary   Whether to write a primitive object as an UTF8 variable length string or
+   *                      as a fixed width byte array onto the byte stream.
+   * @throws IOException  On error in writing to the serialization stream.
+   * @return true         On serializing a non-null object, otherwise false.
    */
-  private boolean serialize(Object obj, ObjectInspector objInspector, int level)
-      throws IOException {
+  private boolean serialize(
+      Object obj,
+      ObjectInspector objInspector,
+      int level,
+      boolean writeBinary) throws IOException {
+
+    if (objInspector.getCategory() == Category.PRIMITIVE && writeBinary) {
+      LazyUtils.writePrimitive(serializeStream, obj, (PrimitiveObjectInspector) objInspector);
+      return true;
+    } else {
+      return serialize(obj, objInspector, level);
+    }
+  }
+
+  private boolean serialize(
+      Object obj,
+      ObjectInspector objInspector,
+      int level) throws IOException {
 
     switch (objInspector.getCategory()) {
       case PRIMITIVE: {
-        LazyUtils.writePrimitiveUTF8(
-          serializeStream, obj,
-          (PrimitiveObjectInspector) objInspector,
-          escaped, escapeChar, needsEscape);
+        LazyUtils.writePrimitiveUTF8(serializeStream, obj,
+            (PrimitiveObjectInspector) objInspector, escaped, escapeChar, needsEscape);
         return true;
       }
+
       case LIST: {
         char separator = (char) separators[level];
         ListObjectInspector loi = (ListObjectInspector)objInspector;
@@ -528,6 +715,7 @@ public class HBaseSerDe implements SerDe
         }
         return true;
       }
+
       case MAP: {
         char separator = (char) separators[level];
         char keyValueSeparator = (char) separators[level+1];
@@ -553,6 +741,7 @@ public class HBaseSerDe implements SerDe
         }
         return true;
       }
+
       case STRUCT: {
         char separator = (char)separators[level];
         StructObjectInspector soi = (StructObjectInspector)objInspector;
@@ -565,7 +754,9 @@ public class HBaseSerDe implements SerDe
             if (i > 0) {
               serializeStream.write(separator);
             }
-            serialize(list.get(i), fields.get(i).getFieldObjectInspector(), level + 1);
+
+            serialize(list.get(i), fields.get(i).getFieldObjectInspector(),
+                level + 1);
           }
         }
         return true;
@@ -601,4 +792,23 @@ public class HBaseSerDe implements SerDe
     // no support for statistics
     return null;
   }
+
+  void setKeyColumnOffset() throws SerDeException {
+    iKey = getRowKeyColumnOffset(columnsMapping);
+  }
+
+  public static int getRowKeyColumnOffset(List<ColumnMapping> columnsMapping)
+      throws SerDeException {
+
+    for (int i = 0; i < columnsMapping.size(); i++) {
+      ColumnMapping colMap = columnsMapping.get(i);
+
+      if (colMap.hbaseRowKey && colMap.familyName.equals(HBASE_KEY_COL)) {
+        return i;
+      }
+    }
+
+    throw new SerDeException("HBaseSerDe Error: columns mapping list does not contain" +
+      " row key column.");
+  }
 }

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsAggregator.java Fri Mar  9 00:47:37 2012
@@ -49,8 +49,8 @@ public class HBaseStatsAggregator implem
   public boolean connect(Configuration hiveconf) {
 
     try {
-      HBaseConfiguration hbaseConf = new HBaseConfiguration(hiveconf);
-      htable = new HTable(hbaseConf, HBaseStatsSetupConstants.PART_STAT_TABLE_NAME);
+      htable = new HTable(HBaseConfiguration.create(hiveconf),
+        HBaseStatsSetupConstants.PART_STAT_TABLE_NAME);
 
       return true;
     } catch (IOException e) {

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsPublisher.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsPublisher.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStatsPublisher.java Fri Mar  9 00:47:37 2012
@@ -49,8 +49,8 @@ public class HBaseStatsPublisher impleme
   public boolean connect(Configuration hiveconf) {
 
     try {
-      HBaseConfiguration hbaseConf = new HBaseConfiguration(hiveconf);
-      htable = new HTable(hbaseConf, HBaseStatsSetupConstants.PART_STAT_TABLE_NAME);
+      htable = new HTable(HBaseConfiguration.create(hiveconf),
+        HBaseStatsSetupConstants.PART_STAT_TABLE_NAME);
       // for performance reason, defer update until the closeConnection
       htable.setAutoFlush(false);
     } catch (IOException e) {
@@ -130,8 +130,7 @@ public class HBaseStatsPublisher impleme
    */
   public boolean init(Configuration hiveconf) {
     try {
-      HBaseConfiguration hbaseConf = new HBaseConfiguration(hiveconf);
-      HBaseAdmin hbase = new HBaseAdmin(hbaseConf);
+      HBaseAdmin hbase = new HBaseAdmin(HBaseConfiguration.create(hiveconf));
 
       // Creating table if not exists
       if (!hbase.tableExists(HBaseStatsSetupConstants.PART_STAT_TABLE_NAME)) {

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Fri Mar  9 00:47:37 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.MasterNot
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Constants;
@@ -63,7 +64,7 @@ public class HBaseStorageHandler extends
 
   final static public String DEFAULT_PREFIX = "default.";
 
-  private HBaseConfiguration hbaseConf;
+  private Configuration hbaseConf;
   private HBaseAdmin admin;
 
   private HBaseAdmin getHBaseAdmin() throws MetaException {
@@ -137,17 +138,9 @@ public class HBaseStorageHandler extends
       String tableName = getHBaseTableName(tbl);
       Map<String, String> serdeParam = tbl.getSd().getSerdeInfo().getParameters();
       String hbaseColumnsMapping = serdeParam.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+      List<ColumnMapping> columnsMapping = null;
 
-      if (hbaseColumnsMapping == null) {
-        throw new MetaException("No hbase.columns.mapping defined in Serde.");
-      }
-
-      List<String> hbaseColumnFamilies = new ArrayList<String>();
-      List<String> hbaseColumnQualifiers = new ArrayList<String>();
-      List<byte []> hbaseColumnFamiliesBytes = new ArrayList<byte []>();
-      List<byte []> hbaseColumnQualifiersBytes = new ArrayList<byte []>();
-      int iKey = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping, hbaseColumnFamilies,
-          hbaseColumnFamiliesBytes, hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
+      columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
 
       HTableDescriptor tableDesc;
 
@@ -156,8 +149,13 @@ public class HBaseStorageHandler extends
         if (!isExternal) {
           // Create the column descriptors
           tableDesc = new HTableDescriptor(tableName);
-          Set<String> uniqueColumnFamilies = new HashSet<String>(hbaseColumnFamilies);
-          uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey));
+          Set<String> uniqueColumnFamilies = new HashSet<String>();
+
+          for (ColumnMapping colMap : columnsMapping) {
+            if (!colMap.hbaseRowKey) {
+              uniqueColumnFamilies.add(colMap.familyName);
+            }
+          }
 
           for (String columnFamily : uniqueColumnFamilies) {
             tableDesc.addFamily(new HColumnDescriptor(Bytes.toBytes(columnFamily)));
@@ -179,13 +177,15 @@ public class HBaseStorageHandler extends
         // make sure the schema mapping is right
         tableDesc = getHBaseAdmin().getTableDescriptor(Bytes.toBytes(tableName));
 
-        for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
-          if (i == iKey) {
+        for (int i = 0; i < columnsMapping.size(); i++) {
+          ColumnMapping colMap = columnsMapping.get(i);
+
+          if (colMap.hbaseRowKey) {
             continue;
           }
 
-          if (!tableDesc.hasFamily(hbaseColumnFamiliesBytes.get(i))) {
-            throw new MetaException("Column Family " + hbaseColumnFamilies.get(i)
+          if (!tableDesc.hasFamily(colMap.familyNameBytes)) {
+            throw new MetaException("Column Family " + colMap.familyName
                 + " is not defined in hbase table " + tableName);
           }
         }
@@ -231,7 +231,7 @@ public class HBaseStorageHandler extends
 
   @Override
   public void setConf(Configuration conf) {
-    hbaseConf = new HBaseConfiguration(conf);
+    hbaseConf = HBaseConfiguration.create(conf);
   }
 
   @Override

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Fri Mar  9 00:47:37 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
 import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
@@ -79,23 +80,18 @@ public class HiveHBaseTableInputFormat e
     HBaseSplit hbaseSplit = (HBaseSplit) split;
     TableSplit tableSplit = hbaseSplit.getSplit();
     String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
-    setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
+    setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)));
     String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
-    List<String> hbaseColumnFamilies = new ArrayList<String>();
-    List<String> hbaseColumnQualifiers = new ArrayList<String>();
-    List<byte []> hbaseColumnFamiliesBytes = new ArrayList<byte []>();
-    List<byte []> hbaseColumnQualifiersBytes = new ArrayList<byte []>();
+    List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
+    List<ColumnMapping> columnsMapping = null;
 
-    int iKey;
     try {
-      iKey = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping, hbaseColumnFamilies,
-          hbaseColumnFamiliesBytes, hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
-    } catch (SerDeException se) {
-      throw new IOException(se);
+      columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+    } catch (SerDeException e) {
+      throw new IOException(e);
     }
-    List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
 
-    if (hbaseColumnFamilies.size() < readColIDs.size()) {
+    if (columnsMapping.size() < readColIDs.size()) {
       throw new IOException("Cannot read more columns than the given table contains.");
     }
 
@@ -105,14 +101,15 @@ public class HiveHBaseTableInputFormat e
 
     if (!addAll) {
       for (int i : readColIDs) {
-        if (i == iKey) {
+        ColumnMapping colMap = columnsMapping.get(i);
+        if (colMap.hbaseRowKey) {
           continue;
         }
 
-        if (hbaseColumnQualifiers.get(i) == null) {
-          scan.addFamily(hbaseColumnFamiliesBytes.get(i));
+        if (colMap.qualifierName == null) {
+          scan.addFamily(colMap.familyNameBytes);
         } else {
-          scan.addColumn(hbaseColumnFamiliesBytes.get(i), hbaseColumnQualifiersBytes.get(i));
+          scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
         }
 
         empty = false;
@@ -125,15 +122,16 @@ public class HiveHBaseTableInputFormat e
     // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive
     // tables column projection.
     if (empty) {
-      for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
-        if (i == iKey) {
+      for (int i = 0; i < columnsMapping.size(); i++) {
+        ColumnMapping colMap = columnsMapping.get(i);
+        if (colMap.hbaseRowKey) {
           continue;
         }
 
-        if (hbaseColumnQualifiers.get(i) == null) {
-          scan.addFamily(hbaseColumnFamiliesBytes.get(i));
+        if (colMap.qualifierName == null) {
+          scan.addFamily(colMap.familyNameBytes);
         } else {
-          scan.addColumn(hbaseColumnFamiliesBytes.get(i), hbaseColumnQualifiersBytes.get(i));
+          scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
         }
 
         if (!addAll) {
@@ -144,10 +142,16 @@ public class HiveHBaseTableInputFormat e
 
     // If Hive's optimizer gave us a filter to process, convert it to the
     // HBase scan form now.
-    tableSplit = convertFilter(jobConf, scan, tableSplit, iKey);
+    int iKey = -1;
 
-    setScan(scan);
+    try {
+      iKey = HBaseSerDe.getRowKeyColumnOffset(columnsMapping);
+    } catch (SerDeException e) {
+      throw new IOException(e);
+    }
 
+    tableSplit = convertFilter(jobConf, scan, tableSplit, iKey);
+    setScan(scan);
     Job job = new Job(jobConf);
     TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(
         job.getConfiguration(), reporter);
@@ -363,50 +367,53 @@ public class HiveHBaseTableInputFormat e
   public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
 
     String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
-    setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
+    setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName)));
     String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
 
     if (hbaseColumnsMapping == null) {
       throw new IOException("hbase.columns.mapping required for HBase Table.");
     }
 
-    List<String> hbaseColumnFamilies = new ArrayList<String>();
-    List<String> hbaseColumnQualifiers = new ArrayList<String>();
-    List<byte []> hbaseColumnFamiliesBytes = new ArrayList<byte []>();
-    List<byte []> hbaseColumnQualifiersBytes = new ArrayList<byte []>();
+    List<ColumnMapping> columnsMapping = null;
+    try {
+      columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping);
+    } catch (SerDeException e) {
+      throw new IOException(e);
+    }
 
     int iKey;
+
     try {
-      iKey = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping, hbaseColumnFamilies,
-          hbaseColumnFamiliesBytes, hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
-    } catch (SerDeException se) {
-      throw new IOException(se);
+      iKey = HBaseSerDe.getRowKeyColumnOffset(columnsMapping);
+    } catch (SerDeException e) {
+      throw new IOException(e);
     }
 
     Scan scan = new Scan();
 
-    // Take filter pushdown into account while calculating splits; this
-    // allows us to prune off regions immediately.  Note that although
-    // the Javadoc for the superclass getSplits says that it returns one
-    // split per region, the implementation actually takes the scan
-    // definition into account and excludes regions which don't satisfy
-    // the start/stop row conditions (HBASE-1829).
-    convertFilter(jobConf, scan, null, iKey);
-
     // REVIEW:  are we supposed to be applying the getReadColumnIDs
     // same as in getRecordReader?
-    for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
-      if (i == iKey) {
+    for (int i = 0; i <columnsMapping.size(); i++) {
+      ColumnMapping colMap = columnsMapping.get(i);
+      if (colMap.hbaseRowKey) {
         continue;
       }
 
-      if (hbaseColumnQualifiers.get(i) == null) {
-        scan.addFamily(hbaseColumnFamiliesBytes.get(i));
+      if (colMap.qualifierName == null) {
+        scan.addFamily(colMap.familyNameBytes);
       } else {
-        scan.addColumn(hbaseColumnFamiliesBytes.get(i), hbaseColumnQualifiersBytes.get(i));
+        scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
       }
     }
 
+    // Take filter pushdown into account while calculating splits; this
+    // allows us to prune off regions immediately.  Note that although
+    // the Javadoc for the superclass getSplits says that it returns one
+    // split per region, the implementation actually takes the scan
+    // definition into account and excludes regions which don't satisfy
+    // the start/stop row conditions (HBASE-1829).
+    convertFilter(jobConf, scan, null, iKey);
+
     setScan(scan);
     Job job = new Job(jobConf);
     JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java Fri Mar  9 00:47:37 2012
@@ -78,7 +78,7 @@ public class HiveHBaseTableOutputFormat 
     jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
     final boolean walEnabled = HiveConf.getBoolVar(
         jc, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED);
-    final HTable table = new HTable(new HBaseConfiguration(jc), hbaseTableName);
+    final HTable table = new HTable(HBaseConfiguration.create(jc), hbaseTableName);
     table.setAutoFlush(false);
 
     return new RecordWriter() {

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java Fri Mar  9 00:47:37 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.hbase;
 
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Map.Entry;
@@ -41,6 +42,7 @@ public class LazyHBaseCellMap extends La
 
   private Result result;
   private byte [] columnFamilyBytes;
+  private List<Boolean> binaryStorage;
 
   /**
    * Construct a LazyCellMap object with the ObjectInspector.
@@ -50,9 +52,14 @@ public class LazyHBaseCellMap extends La
     super(oi);
   }
 
-  public void init(Result r, byte [] columnFamilyBytes) {
+  public void init(
+      Result r,
+      byte [] columnFamilyBytes,
+      List<Boolean> binaryStorage) {
+
     result = r;
     this.columnFamilyBytes = columnFamilyBytes;
+    this.binaryStorage = binaryStorage;
     setParsed(false);
   }
 
@@ -73,10 +80,13 @@ public class LazyHBaseCellMap extends La
           continue;
         }
 
+        LazyMapObjectInspector lazyMoi = getInspector();
+
         // Keys are always primitive
         LazyPrimitive<? extends ObjectInspector, ? extends Writable> key =
           LazyFactory.createLazyPrimitiveClass(
-              (PrimitiveObjectInspector) getInspector().getMapKeyObjectInspector());
+              (PrimitiveObjectInspector) lazyMoi.getMapKeyObjectInspector(),
+              binaryStorage.get(0));
 
         ByteArrayRef keyRef = new ByteArrayRef();
         keyRef.setData(e.getKey());
@@ -84,8 +94,8 @@ public class LazyHBaseCellMap extends La
 
         // Value
         LazyObject<?> value =
-          LazyFactory.createLazyObject(
-              getInspector().getMapValueObjectInspector());
+          LazyFactory.createLazyObject(lazyMoi.getMapValueObjectInspector(),
+              binaryStorage.get(1));
 
         ByteArrayRef valueRef = new ByteArrayRef();
         valueRef.setData(e.getValue());

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java Fri Mar  9 00:47:37 2012
@@ -23,12 +23,14 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
 import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
 import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
 import org.apache.hadoop.hive.serde2.lazy.LazyObject;
 import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
 import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
 import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
@@ -42,10 +44,7 @@ public class LazyHBaseRow extends LazySt
    * The HBase columns mapping of the row.
    */
   private Result result;
-  private List<String> hbaseColumnFamilies;
-  private List<byte []> hbaseColumnFamiliesBytes;
-  private List<String> hbaseColumnQualifiers;
-  private List<byte []> hbaseColumnQualifiersBytes;
+  private List<ColumnMapping> columnsMapping;
   private ArrayList<Object> cachedList;
 
   /**
@@ -59,18 +58,10 @@ public class LazyHBaseRow extends LazySt
    * Set the HBase row data(a Result writable) for this LazyStruct.
    * @see LazyHBaseRow#init(Result)
    */
-  public void init(
-      Result r,
-      List<String> hbaseColumnFamilies,
-      List<byte []> hbaseColumnFamiliesBytes,
-      List<String> hbaseColumnQualifiers,
-      List<byte []> hbaseColumnQualifiersBytes) {
+  public void init(Result r, List<ColumnMapping> columnsMapping) {
 
     result = r;
-    this.hbaseColumnFamilies = hbaseColumnFamilies;
-    this.hbaseColumnFamiliesBytes = hbaseColumnFamiliesBytes;
-    this.hbaseColumnQualifiers = hbaseColumnQualifiers;
-    this.hbaseColumnQualifiersBytes = hbaseColumnQualifiersBytes;
+    this.columnsMapping = columnsMapping;
     setParsed(false);
   }
 
@@ -79,25 +70,31 @@ public class LazyHBaseRow extends LazySt
    * @see LazyStruct#parse()
    */
   private void parse() {
+
     if (getFields() == null) {
       List<? extends StructField> fieldRefs =
         ((StructObjectInspector)getInspector()).getAllStructFieldRefs();
-      setFields(new LazyObject[fieldRefs.size()]);
-      for (int i = 0; i < getFields().length; i++) {
-        String hbaseColumnFamily = hbaseColumnFamilies.get(i);
-        String hbaseColumnQualifier = hbaseColumnQualifiers.get(i);
+      LazyObject<? extends ObjectInspector> [] fields = new LazyObject<?>[fieldRefs.size()];
+
+      for (int i = 0; i < fields.length; i++) {
+        ColumnMapping colMap = columnsMapping.get(i);
 
-        if (hbaseColumnQualifier == null && !HBaseSerDe.isSpecialColumn(hbaseColumnFamily)) {
+        if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
           // a column family
-          getFields()[i] = new LazyHBaseCellMap(
+          fields[i] = new LazyHBaseCellMap(
               (LazyMapObjectInspector) fieldRefs.get(i).getFieldObjectInspector());
           continue;
         }
 
-        getFields()[i] = LazyFactory.createLazyObject(fieldRefs.get(i).getFieldObjectInspector());
+        fields[i] = LazyFactory.createLazyObject(
+            fieldRefs.get(i).getFieldObjectInspector(),
+            colMap.binaryStorage.get(0));
       }
-      setFieldInited(new boolean[getFields().length]);
+
+      setFields(fields);
+      setFieldInited(new boolean[fields.length]);
     }
+
     Arrays.fill(getFieldInited(), false);
     setParsed(true);
   }
@@ -119,6 +116,7 @@ public class LazyHBaseRow extends LazySt
     if (!getParsed()) {
       parse();
     }
+
     return uncheckedGetField(fieldID);
   }
 
@@ -130,25 +128,27 @@ public class LazyHBaseRow extends LazySt
    * @return  The value of the field
    */
   private Object uncheckedGetField(int fieldID) {
-    if (!getFieldInited()[fieldID]) {
-      getFieldInited()[fieldID] = true;
+
+    LazyObject<?> [] fields = getFields();
+    boolean [] fieldsInited = getFieldInited();
+
+    if (!fieldsInited[fieldID]) {
+      fieldsInited[fieldID] = true;
       ByteArrayRef ref = null;
-      String columnFamily = hbaseColumnFamilies.get(fieldID);
-      String columnQualifier = hbaseColumnQualifiers.get(fieldID);
-      byte [] columnFamilyBytes = hbaseColumnFamiliesBytes.get(fieldID);
-      byte [] columnQualifierBytes = hbaseColumnQualifiersBytes.get(fieldID);
+      ColumnMapping colMap = columnsMapping.get(fieldID);
 
-      if (HBaseSerDe.isSpecialColumn(columnFamily)) {
-        assert(columnQualifier == null);
+      if (colMap.hbaseRowKey) {
         ref = new ByteArrayRef();
         ref.setData(result.getRow());
       } else {
-        if (columnQualifier == null) {
+        if (colMap.qualifierName == null) {
           // it is a column family
-          ((LazyHBaseCellMap) getFields()[fieldID]).init(result, columnFamilyBytes);
+          // primitive type for Map<Key, Value> can be stored in binary format
+          ((LazyHBaseCellMap) fields[fieldID]).init(
+              result, colMap.familyNameBytes, colMap.binaryStorage);
         } else {
           // it is a column i.e. a column-family with column-qualifier
-          byte [] res = result.getValue(columnFamilyBytes, columnQualifierBytes);
+          byte [] res = result.getValue(colMap.familyNameBytes, colMap.qualifierNameBytes);
 
           if (res == null) {
             return null;
@@ -160,11 +160,11 @@ public class LazyHBaseRow extends LazySt
       }
 
       if (ref != null) {
-        getFields()[fieldID].init(ref, 0, ref.getData().length);
+        fields[fieldID].init(ref, 0, ref.getData().length);
       }
     }
 
-    return getFields()[fieldID].getObject();
+    return fields[fieldID].getObject();
   }
 
   /**

Modified: hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java (original)
+++ hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java Fri Mar  9 00:47:37 2012
@@ -21,16 +21,23 @@ package org.apache.hadoop.hive.hbase;
 import java.io.File;
 import java.io.IOException;
 import java.net.ServerSocket;
+import java.util.Arrays;
 
 import junit.extensions.TestSetup;
 import junit.framework.Test;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.zookeeper.Watcher;
@@ -62,8 +69,7 @@ public class HBaseTestSetup extends Test
     }
     conf.set("hbase.rootdir", hbaseRoot);
     conf.set("hbase.master", hbaseCluster.getHMasterAddress().toString());
-    conf.set("hbase.zookeeper.property.clientPort",
-      Integer.toString(zooKeeperPort));
+    conf.set("hbase.zookeeper.property.clientPort", Integer.toString(zooKeeperPort));
     String auxJars = conf.getAuxJars();
     auxJars = ((auxJars == null) ? "" : (auxJars + ",")) + "file://"
       + new JobConf(conf, HBaseConfiguration.class).getJar();
@@ -82,7 +88,7 @@ public class HBaseTestSetup extends Test
       new File(tmpdir, "zookeeper"));
     conf.set("hbase.zookeeper.property.clientPort",
       Integer.toString(zooKeeperPort));
-    HBaseConfiguration hbaseConf = new HBaseConfiguration(conf);
+    Configuration hbaseConf = HBaseConfiguration.create(conf);
     hbaseConf.setInt("hbase.master.port", findFreePort());
     hbaseConf.setInt("hbase.master.info.port", -1);
     hbaseConf.setInt("hbase.regionserver.port", findFreePort());
@@ -90,7 +96,49 @@ public class HBaseTestSetup extends Test
     hbaseCluster = new MiniHBaseCluster(hbaseConf, NUM_REGIONSERVERS);
     conf.set("hbase.master", hbaseCluster.getHMasterAddress().toString());
     // opening the META table ensures that cluster is running
-    new HTable(new HBaseConfiguration(conf), HConstants.META_TABLE_NAME);
+    new HTable(hbaseConf, HConstants.META_TABLE_NAME);
+    createHBaseTable(hbaseConf);
+  }
+
+  private void createHBaseTable(Configuration hbaseConf) throws IOException {
+    final String HBASE_TABLE_NAME = "HiveExternalTable";
+    HTableDescriptor htableDesc = new HTableDescriptor(HBASE_TABLE_NAME.getBytes());
+    HColumnDescriptor hcolDesc = new HColumnDescriptor("cf".getBytes());
+    htableDesc.addFamily(hcolDesc);
+    HBaseAdmin hbaseAdmin = new HBaseAdmin(hbaseConf);
+    if(Arrays.asList(hbaseAdmin.listTables()).contains(htableDesc)){
+      // if table is already in there, don't recreate.
+      return;
+    }
+    hbaseAdmin.createTable(htableDesc);
+    HTable htable = new HTable(hbaseConf, HBASE_TABLE_NAME);
+
+    // data
+    Put [] puts = new Put [] {
+        new Put("key-1".getBytes()), new Put("key-2".getBytes()), new Put("key-3".getBytes()) };
+
+    boolean [] booleans = new boolean [] { true, false, true };
+    byte [] bytes = new byte [] { Byte.MIN_VALUE, -1, Byte.MAX_VALUE };
+    short [] shorts = new short [] { Short.MIN_VALUE, -1, Short.MAX_VALUE };
+    int [] ints = new int [] { Integer.MIN_VALUE, -1, Integer.MAX_VALUE };
+    long [] longs = new long [] { Long.MIN_VALUE, -1, Long.MAX_VALUE };
+    String [] strings = new String [] { "Hadoop, HBase,", "Hive", "Test Strings" };
+    float [] floats = new float [] { Float.MIN_VALUE, -1.0F, Float.MAX_VALUE };
+    double [] doubles = new double [] { Double.MIN_VALUE, -1.0, Double.MAX_VALUE };
+
+    // store data
+    for (int i = 0; i < puts.length; i++) {
+      puts[i].add("cf".getBytes(), "cq-boolean".getBytes(), Bytes.toBytes(booleans[i]));
+      puts[i].add("cf".getBytes(), "cq-byte".getBytes(), new byte [] { bytes[i] });
+      puts[i].add("cf".getBytes(), "cq-short".getBytes(), Bytes.toBytes(shorts[i]));
+      puts[i].add("cf".getBytes(), "cq-int".getBytes(), Bytes.toBytes(ints[i]));
+      puts[i].add("cf".getBytes(), "cq-long".getBytes(), Bytes.toBytes(longs[i]));
+      puts[i].add("cf".getBytes(), "cq-string".getBytes(), Bytes.toBytes(strings[i]));
+      puts[i].add("cf".getBytes(), "cq-float".getBytes(), Bytes.toBytes(floats[i]));
+      puts[i].add("cf".getBytes(), "cq-double".getBytes(), Bytes.toBytes(doubles[i]));
+
+      htable.put(puts[i]);
+    }
   }
 
   private static int findFreePort() throws IOException {

Modified: hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=1298673&r1=1298672&r2=1298673&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Fri Mar  9 00:47:37 2012
@@ -50,14 +50,9 @@ import org.apache.hadoop.io.Text;
 public class TestHBaseSerDe extends TestCase {
 
   /**
-   * Test the LazySimpleSerDe class.
+   * Test the default behavior of the Lazy family of objects and object inspectors.
    */
-  public void testHBaseSerDe() throws SerDeException {
-    // Create the SerDe
-    HBaseSerDe serDe = new HBaseSerDe();
-    Configuration conf = new Configuration();
-    Properties tbl = createProperties();
-    serDe.initialize(conf, tbl);
+  public void testHBaseSerDeI() throws SerDeException {
 
     byte [] cfa = "cola".getBytes();
     byte [] cfb = "colb".getBytes();
@@ -112,6 +107,33 @@ public class TestHBaseSerDe extends Test
       new BooleanWritable(true)
     };
 
+    // Create, initialize, and test the SerDe
+    HBaseSerDe serDe = new HBaseSerDe();
+    Configuration conf = new Configuration();
+    Properties tbl = createPropertiesI_I();
+    serDe.initialize(conf, tbl);
+
+    deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+
+    serDe = new HBaseSerDe();
+    conf = new Configuration();
+    tbl = createPropertiesI_II();
+    serDe.initialize(conf, tbl);
+
+    deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+
+    serDe = new HBaseSerDe();
+    conf = new Configuration();
+    tbl = createPropertiesI_III();
+    serDe.initialize(conf, tbl);
+
+    deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+
+    serDe = new HBaseSerDe();
+    conf = new Configuration();
+    tbl = createPropertiesI_IV();
+    serDe.initialize(conf, tbl);
+
     deserializeAndSerialize(serDe, r, p, expectedFieldsData);
   }
 
@@ -119,7 +141,7 @@ public class TestHBaseSerDe extends Test
     // Create the SerDe
     HBaseSerDe serDe = new HBaseSerDe();
     Configuration conf = new Configuration();
-    Properties tbl = createProperties();
+    Properties tbl = createPropertiesI_I();
     long putTimestamp = 1;
     tbl.setProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,
             Long.toString(putTimestamp));
@@ -207,7 +229,8 @@ public class TestHBaseSerDe extends Test
     assertEquals("Serialized data", p.toString(), serializedPut.toString());
   }
 
-  private Properties createProperties() {
+  // No specifications default to UTF8 String storage for backwards compatibility
+  private Properties createPropertiesI_I() {
     Properties tbl = new Properties();
 
     // Set the configuration parameters
@@ -219,4 +242,450 @@ public class TestHBaseSerDe extends Test
         "cola:byte,colb:short,colc:int,cola:long,colb:float,colc:double,cola:string,colb:boolean");
     return tbl;
   }
+
+  // Default column storage specification inherits from table level default
+  // (in this case a missing specification) of UTF String storage
+  private Properties createPropertiesI_II() {
+    Properties tbl = new Properties();
+
+    // Set the configuration parameters
+    tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+    tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
+    tbl.setProperty("columns.types",
+        "string,tinyint:smallint:int:bigint:float:double:string:boolean");
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+        ":key#-,cola:byte#s,colb:short#-,colc:int#s,cola:long#s,colb:float#-,colc:double#-," +
+        "cola:string#s,colb:boolean#s");
+    return tbl;
+  }
+
+  // String storage type overrides table level default of binary storage
+  private Properties createPropertiesI_III() {
+    Properties tbl = new Properties();
+
+    // Set the configuration parameters
+    tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+    tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
+    tbl.setProperty("columns.types",
+        "string,tinyint:smallint:int:bigint:float:double:string:boolean");
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+        ":key#s,cola:byte#s,colb:short#s,colc:int#s,cola:long#s,colb:float#s,colc:double#s," +
+        "cola:string#s,colb:boolean#s");
+    tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary");
+    return tbl;
+  }
+
+  // String type is never stored as anything other than an escaped string
+  // A specification of binary storage should not affect ser/de.
+  private Properties createPropertiesI_IV() {
+    Properties tbl = new Properties();
+
+    // Set the configuration parameters
+    tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+    tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
+    tbl.setProperty("columns.types",
+        "string,tinyint:smallint:int:bigint:float:double:string:boolean");
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+        ":key#-,cola:byte#s,colb:short#s,colc:int#s,cola:long#s,colb:float#s,colc:double#s," +
+        "cola:string#b,colb:boolean#s");
+    tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary");
+    return tbl;
+  }
+
+  public void testHBaseSerDeII() throws SerDeException {
+
+    byte [] cfa = "cfa".getBytes();
+    byte [] cfb = "cfb".getBytes();
+    byte [] cfc = "cfc".getBytes();
+
+    byte [] qualByte = "byte".getBytes();
+    byte [] qualShort = "short".getBytes();
+    byte [] qualInt = "int".getBytes();
+    byte [] qualLong = "long".getBytes();
+    byte [] qualFloat = "float".getBytes();
+    byte [] qualDouble = "double".getBytes();
+    byte [] qualString = "string".getBytes();
+    byte [] qualBool = "boolean".getBytes();
+
+    byte [] rowKey = Bytes.toBytes("test-row-2");
+
+    // Data
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+    kvs.add(new KeyValue(rowKey, cfa, qualByte, new byte [] { Byte.MIN_VALUE }));
+    kvs.add(new KeyValue(rowKey, cfb, qualShort, Bytes.toBytes(Short.MIN_VALUE)));
+    kvs.add(new KeyValue(rowKey, cfc, qualInt, Bytes.toBytes(Integer.MIN_VALUE)));
+    kvs.add(new KeyValue(rowKey, cfa, qualLong, Bytes.toBytes(Long.MIN_VALUE)));
+    kvs.add(new KeyValue(rowKey, cfb, qualFloat, Bytes.toBytes(Float.MIN_VALUE)));
+    kvs.add(new KeyValue(rowKey, cfc, qualDouble, Bytes.toBytes(Double.MAX_VALUE)));
+    kvs.add(new KeyValue(rowKey, cfa, qualString, Bytes.toBytes(
+      "Hadoop, HBase, and Hive Again!")));
+    kvs.add(new KeyValue(rowKey, cfb, qualBool, Bytes.toBytes(false)));
+
+    Collections.sort(kvs, KeyValue.COMPARATOR);
+    Result r = new Result(kvs);
+
+    Put p = new Put(rowKey);
+
+    p.add(cfa, qualByte, new byte [] { Byte.MIN_VALUE });
+    p.add(cfb, qualShort, Bytes.toBytes(Short.MIN_VALUE));
+    p.add(cfc, qualInt, Bytes.toBytes(Integer.MIN_VALUE));
+    p.add(cfa, qualLong, Bytes.toBytes(Long.MIN_VALUE));
+    p.add(cfb, qualFloat, Bytes.toBytes(Float.MIN_VALUE));
+    p.add(cfc, qualDouble, Bytes.toBytes(Double.MAX_VALUE));
+    p.add(cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive Again!"));
+    p.add(cfb, qualBool, Bytes.toBytes(false));
+
+    Object[] expectedFieldsData = {
+      new Text("test-row-2"),
+      new ByteWritable(Byte.MIN_VALUE),
+      new ShortWritable(Short.MIN_VALUE),
+      new IntWritable(Integer.MIN_VALUE),
+      new LongWritable(Long.MIN_VALUE),
+      new FloatWritable(Float.MIN_VALUE),
+      new DoubleWritable(Double.MAX_VALUE),
+      new Text("Hadoop, HBase, and Hive Again!"),
+      new BooleanWritable(false)
+    };
+
+    // Create, initialize, and test the SerDe
+    HBaseSerDe serDe = new HBaseSerDe();
+    Configuration conf = new Configuration();
+    Properties tbl = createPropertiesII_I();
+    serDe.initialize(conf, tbl);
+
+    deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+
+    serDe = new HBaseSerDe();
+    conf = new Configuration();
+    tbl = createPropertiesII_II();
+    serDe.initialize(conf, tbl);
+
+    deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+
+    serDe = new HBaseSerDe();
+    conf = new Configuration();
+    tbl = createPropertiesII_III();
+    serDe.initialize(conf, tbl);
+
+    deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+  }
+
+  private Properties createPropertiesII_I() {
+    Properties tbl = new Properties();
+
+    // Set the configuration parameters
+    tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+    tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
+    tbl.setProperty("columns.types",
+        "string,tinyint:smallint:int:bigint:float:double:string:boolean");
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+        ":key#-,cfa:byte#b,cfb:short#b,cfc:int#-,cfa:long#b,cfb:float#-,cfc:double#b," +
+        "cfa:string#b,cfb:boolean#-");
+    tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary");
+    return tbl;
+  }
+
+  private Properties createPropertiesII_II() {
+    Properties tbl = new Properties();
+
+    // Set the configuration parameters
+    tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+    tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
+    tbl.setProperty("columns.types",
+        "string,tinyint:smallint:int:bigint:float:double:string:boolean");
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+        ":key#b,cfa:byte#b,cfb:short#b,cfc:int#b,cfa:long#b,cfb:float#b,cfc:double#b," +
+        "cfa:string#b,cfb:boolean#b");
+    tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string");
+    return tbl;
+  }
+
+  private Properties createPropertiesII_III() {
+    Properties tbl = new Properties();
+
+    // Set the configuration parameters
+    tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+    tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
+    tbl.setProperty("columns.types",
+        "string,tinyint:smallint:int:bigint:float:double:string:boolean");
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+        ":key#-,cfa:byte#b,cfb:short#b,cfc:int#b,cfa:long#b,cfb:float#b,cfc:double#b," +
+        "cfa:string#-,cfb:boolean#b");
+    return tbl;
+  }
+
+  public void testHBaseSerDeWithHiveMapToHBaseColumnFamily() throws SerDeException {
+
+    byte [] cfint = "cf-int".getBytes();
+    byte [] cfbyte = "cf-byte".getBytes();
+    byte [] cfshort = "cf-short".getBytes();
+    byte [] cflong = "cf-long".getBytes();
+    byte [] cffloat = "cf-float".getBytes();
+    byte [] cfdouble = "cf-double".getBytes();
+    byte [] cfbool = "cf-bool".getBytes();
+
+    byte [][] columnFamilies =
+      new byte [][] {cfint, cfbyte, cfshort, cflong, cffloat, cfdouble, cfbool};
+
+    byte [][] rowKeys = new byte [][] {
+        Integer.toString(1).getBytes(),
+        Integer.toString(Integer.MIN_VALUE).getBytes(),
+        Integer.toString(Integer.MAX_VALUE).getBytes()
+    };
+
+    byte [][][] columnQualifiersAndValues = new byte [][][] {
+        {Bytes.toBytes(1), new byte [] {1}, Bytes.toBytes((short) 1),
+         Bytes.toBytes((long) 1), Bytes.toBytes((float) 1.0F), Bytes.toBytes(1.0),
+         Bytes.toBytes(true)},
+        {Bytes.toBytes(Integer.MIN_VALUE), new byte [] {Byte.MIN_VALUE},
+         Bytes.toBytes((short) Short.MIN_VALUE), Bytes.toBytes((long) Long.MIN_VALUE),
+         Bytes.toBytes((float) Float.MIN_VALUE), Bytes.toBytes(Double.MIN_VALUE),
+         Bytes.toBytes(false)},
+        {Bytes.toBytes(Integer.MAX_VALUE), new byte [] {Byte.MAX_VALUE},
+         Bytes.toBytes((short) Short.MAX_VALUE), Bytes.toBytes((long) Long.MAX_VALUE),
+         Bytes.toBytes((float) Float.MAX_VALUE), Bytes.toBytes(Double.MAX_VALUE),
+         Bytes.toBytes(true)}
+    };
+
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    Result [] r = new Result [] {null, null, null};
+    Put [] p = new Put [] {null, null, null};
+
+    for (int i = 0; i < r.length; i++) {
+      kvs.clear();
+      p[i] = new Put(rowKeys[i]);
+
+      for (int j = 0; j < columnQualifiersAndValues[i].length; j++) {
+        kvs.add(new KeyValue(rowKeys[i], columnFamilies[j], columnQualifiersAndValues[i][j],
+            columnQualifiersAndValues[i][j]));
+        p[i].add(columnFamilies[j], columnQualifiersAndValues[i][j],
+            columnQualifiersAndValues[i][j]);
+      }
+
+      r[i] = new Result(kvs);
+    }
+
+    Object [][] expectedData = {
+        {new Text(Integer.toString(1)), new IntWritable(1), new ByteWritable((byte) 1),
+         new ShortWritable((short) 1), new LongWritable(1), new FloatWritable(1.0F),
+         new DoubleWritable(1.0), new BooleanWritable(true)},
+        {new Text(Integer.toString(Integer.MIN_VALUE)), new IntWritable(Integer.MIN_VALUE),
+         new ByteWritable(Byte.MIN_VALUE), new ShortWritable(Short.MIN_VALUE),
+         new LongWritable(Long.MIN_VALUE), new FloatWritable(Float.MIN_VALUE),
+         new DoubleWritable(Double.MIN_VALUE), new BooleanWritable(false)},
+        {new Text(Integer.toString(Integer.MAX_VALUE)), new IntWritable(Integer.MAX_VALUE),
+         new ByteWritable(Byte.MAX_VALUE), new ShortWritable(Short.MAX_VALUE),
+         new LongWritable(Long.MAX_VALUE), new FloatWritable(Float.MAX_VALUE),
+         new DoubleWritable(Double.MAX_VALUE), new BooleanWritable(true)}};
+
+    HBaseSerDe hbaseSerDe = new HBaseSerDe();
+    Configuration conf = new Configuration();
+    Properties tbl = createPropertiesForHiveMapHBaseColumnFamily();
+    hbaseSerDe.initialize(conf, tbl);
+
+    deserializeAndSerializeHiveMapHBaseColumnFamily(hbaseSerDe, r, p, expectedData, rowKeys,
+        columnFamilies, columnQualifiersAndValues);
+
+    hbaseSerDe = new HBaseSerDe();
+    conf = new Configuration();
+    tbl = createPropertiesForHiveMapHBaseColumnFamilyII();
+    hbaseSerDe.initialize(conf, tbl);
+
+    deserializeAndSerializeHiveMapHBaseColumnFamily(hbaseSerDe, r, p, expectedData, rowKeys,
+        columnFamilies, columnQualifiersAndValues);
+  }
+
+  private void deserializeAndSerializeHiveMapHBaseColumnFamily(
+      HBaseSerDe hbaseSerDe,
+      Result [] r,
+      Put [] p,
+      Object [][] expectedData,
+      byte [][] rowKeys,
+      byte [][] columnFamilies,
+      byte [][][] columnQualifiersAndValues) throws SerDeException {
+
+    StructObjectInspector soi = (StructObjectInspector) hbaseSerDe.getObjectInspector();
+    List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
+    assertEquals(8, fieldRefs.size());
+
+    // Deserialize
+    for (int i = 0; i < r.length; i++) {
+      Object row = hbaseSerDe.deserialize(r[i]);
+      Put serializedPut = (Put) hbaseSerDe.serialize(row, soi);
+      byte [] rowKey = serializedPut.getRow();
+
+      for (int k = 0; k < rowKey.length; k++) {
+        assertEquals(rowKey[k], rowKeys[i][k]);
+      }
+
+      assertEquals(columnFamilies.length, serializedPut.numFamilies());
+
+      for (int j = 0; j < fieldRefs.size(); j++) {
+        Object fieldData = soi.getStructFieldData(row, fieldRefs.get(j));
+
+        assertNotNull(fieldData);
+
+        if (fieldData instanceof LazyPrimitive<?, ?>) {
+          assertEquals(expectedData[i][j],
+              ((LazyPrimitive<?, ?>) fieldData).getWritableObject());
+        } else if (fieldData instanceof LazyHBaseCellMap) {
+          LazyPrimitive<?, ?> lazyPrimitive = (LazyPrimitive<?, ?>)
+              ((LazyHBaseCellMap) fieldData).getMapValueElement(expectedData[i][j]);
+          assertEquals(expectedData[i][j], lazyPrimitive.getWritableObject());
+        } else {
+          fail("Error: field data not an instance of LazyPrimitive<?,?> or LazyMap");
+        }
+      }
+    }
+  }
+
+  private Properties createPropertiesForHiveMapHBaseColumnFamily() {
+    Properties tbl = new Properties();
+    tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+    tbl.setProperty(Constants.LIST_COLUMNS,
+        "key,valint,valbyte,valshort,vallong,valfloat,valdouble,valbool");
+    tbl.setProperty(Constants.LIST_COLUMN_TYPES,
+        "string:map<int,int>:map<tinyint,tinyint>:map<smallint,smallint>:map<bigint,bigint>:"
+        + "map<float,float>:map<double,double>:map<boolean,boolean>");
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+        ":key#-,cf-int:#b:b,cf-byte:#b:b,cf-short:#b:b,cf-long:#b:b,cf-float:#b:b,cf-double:#b:b," +
+        "cf-bool:#b:b");
+    return tbl;
+  }
+
+  private Properties createPropertiesForHiveMapHBaseColumnFamilyII() {
+    Properties tbl = new Properties();
+    tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+    tbl.setProperty(Constants.LIST_COLUMNS,
+        "key,valint,valbyte,valshort,vallong,valfloat,valdouble,valbool");
+    tbl.setProperty(Constants.LIST_COLUMN_TYPES,
+        "string:map<int,int>:map<tinyint,tinyint>:map<smallint,smallint>:map<bigint,bigint>:"
+        + "map<float,float>:map<double,double>:map<boolean,boolean>");
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+        ":key#-,cf-int:#-:-,cf-byte:#-:-,cf-short:#-:-,cf-long:#-:-,cf-float:#-:-,cf-double:#-:-," +
+        "cf-bool:#-:-");
+    tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary");
+    return tbl;
+  }
+
+  public void testHBaseSerDeWithHiveMapToHBaseColumnFamilyII() throws SerDeException {
+
+    byte [] cfbyte = "cf-byte".getBytes();
+    byte [] cfshort = "cf-short".getBytes();
+    byte [] cfint = "cf-int".getBytes();
+    byte [] cflong = "cf-long".getBytes();
+    byte [] cffloat = "cf-float".getBytes();
+    byte [] cfdouble = "cf-double".getBytes();
+    byte [] cfstring = "cf-string".getBytes();
+    byte [] cfbool = "cf-bool".getBytes();
+
+    byte [][] columnFamilies =
+      new byte [][] {cfbyte, cfshort, cfint, cflong, cffloat, cfdouble, cfstring, cfbool};
+
+    byte [] rowKey = Bytes.toBytes("row-key");
+
+    byte [][] columnQualifiersAndValues = new byte [][] {
+        Bytes.toBytes("123"), Bytes.toBytes("456"), Bytes.toBytes("789"), Bytes.toBytes("1000"),
+        Bytes.toBytes("-0.01"), Bytes.toBytes("5.3"), Bytes.toBytes("Hive"),
+        Bytes.toBytes("true")
+    };
+
+    Put p = new Put(rowKey);
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+    for (int j = 0; j < columnQualifiersAndValues.length; j++) {
+      kvs.add(new KeyValue(rowKey,
+          columnFamilies[j], columnQualifiersAndValues[j], columnQualifiersAndValues[j]));
+      p.add(columnFamilies[j], columnQualifiersAndValues[j], columnQualifiersAndValues[j]);
+    }
+
+    Result r = new Result(kvs);
+
+    Object [] expectedData = {
+        new Text("row-key"), new ByteWritable((byte) 123), new ShortWritable((short) 456),
+        new IntWritable(789), new LongWritable(1000), new FloatWritable(-0.01F),
+        new DoubleWritable(5.3), new Text("Hive"), new BooleanWritable(true)
+    };
+
+    HBaseSerDe hbaseSerDe = new HBaseSerDe();
+    Configuration conf = new Configuration();
+    Properties tbl = createPropertiesForHiveMapHBaseColumnFamilyII_I();
+    hbaseSerDe.initialize(conf, tbl);
+
+    deserializeAndSerializeHiveMapHBaseColumnFamilyII(hbaseSerDe, r, p, expectedData,
+        columnFamilies, columnQualifiersAndValues);
+
+    hbaseSerDe = new HBaseSerDe();
+    conf = new Configuration();
+    tbl = createPropertiesForHiveMapHBaseColumnFamilyII_II();
+    hbaseSerDe.initialize(conf, tbl);
+
+    deserializeAndSerializeHiveMapHBaseColumnFamilyII(hbaseSerDe, r, p, expectedData,
+        columnFamilies, columnQualifiersAndValues);
+  }
+
+  private Properties createPropertiesForHiveMapHBaseColumnFamilyII_I() {
+    Properties tbl = new Properties();
+    tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+    tbl.setProperty(Constants.LIST_COLUMNS,
+        "key,valbyte,valshort,valint,vallong,valfloat,valdouble,valstring,valbool");
+    tbl.setProperty(Constants.LIST_COLUMN_TYPES,
+        "string:map<tinyint,tinyint>:map<smallint,smallint>:map<int,int>:map<bigint,bigint>:"
+        + "map<float,float>:map<double,double>:map<string,string>:map<boolean,boolean>");
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+        ":key#s,cf-byte:#-:s,cf-short:#s:-,cf-int:#s:s,cf-long:#-:-,cf-float:#s:-,cf-double:#-:s," +
+        "cf-string:#s:s,cf-bool:#-:-");
+    return tbl;
+  }
+
+  private Properties createPropertiesForHiveMapHBaseColumnFamilyII_II() {
+    Properties tbl = new Properties();
+    tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+    tbl.setProperty(Constants.LIST_COLUMNS,
+        "key,valbyte,valshort,valint,vallong,valfloat,valdouble,valstring,valbool");
+    tbl.setProperty(Constants.LIST_COLUMN_TYPES,
+        "string:map<tinyint,tinyint>:map<smallint,smallint>:map<int,int>:map<bigint,bigint>:"
+        + "map<float,float>:map<double,double>:map<string,string>:map<boolean,boolean>");
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+        ":key#s,cf-byte:#s:s,cf-short:#s:s,cf-int:#s:s,cf-long:#s:s,cf-float:#s:s,cf-double:#s:s," +
+        "cf-string:#s:s,cf-bool:#s:s");
+    tbl.setProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "binary");
+    return tbl;
+  }
+
+  private void deserializeAndSerializeHiveMapHBaseColumnFamilyII(
+      HBaseSerDe hbaseSerDe,
+      Result r,
+      Put p,
+      Object [] expectedData,
+      byte [][] columnFamilies,
+      byte [][] columnQualifiersAndValues) throws SerDeException {
+
+    StructObjectInspector soi = (StructObjectInspector) hbaseSerDe.getObjectInspector();
+    List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
+    assertEquals(9, fieldRefs.size());
+
+    // Deserialize
+    Object row = hbaseSerDe.deserialize(r);
+
+    for (int j = 0; j < fieldRefs.size(); j++) {
+      Object fieldData = soi.getStructFieldData(row, fieldRefs.get(j));
+      assertNotNull(fieldData);
+
+      if (fieldData instanceof LazyPrimitive<?, ?>) {
+        assertEquals(expectedData[j], ((LazyPrimitive<?, ?>) fieldData).getWritableObject());
+      } else if (fieldData instanceof LazyHBaseCellMap) {
+        LazyPrimitive<?, ?> lazyPrimitive = (LazyPrimitive<?, ?>)
+          ((LazyHBaseCellMap) fieldData).getMapValueElement(expectedData[j]);
+        assertEquals(expectedData[j], lazyPrimitive.getWritableObject());
+      } else {
+        fail("Error: field data not an instance of LazyPrimitive<?, ?> or LazyHBaseCellMap");
+      }
+    }
+
+    // Serialize
+    Put serializedPut = (Put) hbaseSerDe.serialize(row, soi);
+    assertEquals("Serialized data: ", p.toString(), serializedPut.toString());
+  }
 }