You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jv...@apache.org on 2010/07/22 21:13:56 UTC

svn commit: r966811 - in /hadoop/hive/trunk: ./ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hbase-handler/src/test/org/apache/hadoop/hive/hbase/

Author: jvs
Date: Thu Jul 22 19:13:56 2010
New Revision: 966811

URL: http://svn.apache.org/viewvc?rev=966811&view=rev
Log:
HIVE-1229. Replace dependencies on HBase deprecated API.
(Basab Maulik via jvs)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
    hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
    hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
    hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Jul 22 19:13:56 2010
@@ -31,13 +31,16 @@ Trunk -  Unreleased
     case
     (Siying Dong via Ning Zhang)
 
-    HIVE-1469. replace isArray() calls and remove LOG.isInfoEnabled() in 
+    HIVE-1469. replace isArray() calls and remove LOG.isInfoEnabled() in
     Operator.forward()
     (Yongqiang He via Ning Zhang)
 
     HIVE-1463. Hive output file names are unnecessarily large
     (Joydeep Sen Sarma via Ning Zhang)
 
+    HIVE-1229. Replace dependencies on HBase deprecated API.
+    (Basab Maulik via jvs)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Thu Jul 22 19:13:56 2010
@@ -28,8 +28,8 @@ import java.util.Properties;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.hive.serde2.ByteStream;
@@ -52,8 +52,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -61,24 +59,29 @@ import org.apache.hadoop.io.Writable;
  * deserialize objects from an HBase table.
  */
 public class HBaseSerDe implements SerDe {
-  
-  public static final String HBASE_COL_MAPPING = "hbase.columns.mapping";
-  
+
+  public static final String HBASE_COLUMNS_MAPPING = "hbase.columns.mapping";
   public static final String HBASE_TABLE_NAME = "hbase.table.name";
-  
   public static final String HBASE_KEY_COL = ":key";
-  
-  public static final Log LOG = LogFactory.getLog(
-      HBaseSerDe.class.getName());
+  public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
 
   private ObjectInspector cachedObjectInspector;
   private List<String> hbaseColumnNames;
+  private List<byte []> hbaseColumnNamesBytes;
   private SerDeParameters serdeParams;
   private boolean useJSONSerialize;
   private LazyHBaseRow cachedHBaseRow;
-  private ByteStream.Output serializeStream = new ByteStream.Output();
+  private final ByteStream.Output serializeStream = new ByteStream.Output();
   private int iKey;
 
+  // used for serializing a field
+  private byte [] separators;     // the separators array
+  private boolean escaped;        // whether we need to escape the data when writing out
+  private byte escapeChar;        // which char to use as the escape char, e.g. '\\'
+  private boolean [] needsEscape; // which chars need to be escaped. This array should have size
+                                  // of 128. Negative byte values (or byte values >= 128) are
+                                  // never escaped.
+  @Override
   public String toString() {
     return getClass().toString()
         + "["
@@ -90,32 +93,33 @@ public class HBaseSerDe implements SerDe
         + ((StructTypeInfo) serdeParams.getRowTypeInfo())
             .getAllStructFieldTypeInfos() + "]";
   }
-  
+
   public HBaseSerDe() throws SerDeException {
   }
-  
+
   /**
    * Initialize the SerDe given parameters.
    * @see SerDe#initialize(Configuration, Properties)
    */
+  @Override
   public void initialize(Configuration conf, Properties tbl)
       throws SerDeException {
 
-    initHBaseSerDeParameters(conf, tbl, 
+    initHBaseSerDeParameters(conf, tbl,
         getClass().getName());
-    
+
     cachedObjectInspector = LazyFactory.createLazyStructInspector(
-        serdeParams.getColumnNames(), 
-        serdeParams.getColumnTypes(), 
+        serdeParams.getColumnNames(),
+        serdeParams.getColumnTypes(),
         serdeParams.getSeparators(),
         serdeParams.getNullSequence(),
         serdeParams.isLastColumnTakesRest(),
         serdeParams.isEscaped(),
-        serdeParams.getEscapeChar()); 
-    
+        serdeParams.getEscapeChar());
+
     cachedHBaseRow = new LazyHBaseRow(
       (LazySimpleStructObjectInspector) cachedObjectInspector);
-    
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("HBaseSerDe initialized with : columnNames = "
         + serdeParams.getColumnNames()
@@ -136,25 +140,45 @@ public class HBaseSerDe implements SerDe
     }
     return columnList;
   }
-  
+
+  public static List<byte []> initColumnNamesBytes(List<String> columnNames) {
+    List<byte []> columnBytes = new ArrayList<byte []>();
+    String column = null;
+
+    for (int i = 0; i < columnNames.size(); i++) {
+      column = columnNames.get(i);
+
+      if (column.endsWith(":")) {
+        columnBytes.add(Bytes.toBytes(column.split(":")[0]));
+      } else {
+        columnBytes.add(Bytes.toBytes(column));
+      }
+    }
+
+    return columnBytes;
+  }
+
   public static boolean isSpecialColumn(String hbaseColumnName) {
     return hbaseColumnName.equals(HBASE_KEY_COL);
   }
-  
+
   private void initHBaseSerDeParameters(
-      Configuration job, Properties tbl, String serdeName) 
+      Configuration job, Properties tbl, String serdeName)
     throws SerDeException {
 
     // Read configuration parameters
     String hbaseColumnNameProperty =
-      tbl.getProperty(HBaseSerDe.HBASE_COL_MAPPING);
+      tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
     String columnTypeProperty =
       tbl.getProperty(Constants.LIST_COLUMN_TYPES);
-    
-    // Initialize the hbase column list
+
+    // Initialize the HBase column list
     hbaseColumnNames = parseColumnMapping(hbaseColumnNameProperty);
     iKey = hbaseColumnNames.indexOf(HBASE_KEY_COL);
-      
+
+    // initialize the byte [] corresponding to each column name
+    hbaseColumnNamesBytes = initColumnNamesBytes(hbaseColumnNames);
+
     // Build the type property string if not supplied
     if (columnTypeProperty == null) {
       StringBuilder sb = new StringBuilder();
@@ -180,20 +204,24 @@ public class HBaseSerDe implements SerDe
       }
       tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString());
     }
-    
-    serdeParams = LazySimpleSerDe.initSerdeParams(
-      job, tbl, serdeName);
-    
+
+    serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
+
     if (hbaseColumnNames.size() != serdeParams.getColumnNames().size()) {
-      throw new SerDeException(serdeName + ": columns has " + 
-        serdeParams.getColumnNames().size() + 
-        " elements while hbase.columns.mapping has " + 
+      throw new SerDeException(serdeName + ": columns has " +
+        serdeParams.getColumnNames().size() +
+        " elements while hbase.columns.mapping has " +
         hbaseColumnNames.size() + " elements" +
         " (counting the key if implicit)");
     }
-    
+
+    separators = serdeParams.getSeparators();
+    escaped = serdeParams.isEscaped();
+    escapeChar = serdeParams.getEscapeChar();
+    needsEscape = serdeParams.getNeedsEscape();
+
     // check that the mapping schema is right;
-    // we just can make sure that "columnfamily:" is mapped to MAP<String,?> 
+    // we just can make sure that "column-family:" is mapped to MAP<String,?>
     for (int i = 0; i < hbaseColumnNames.size(); i++) {
       String hbaseColName = hbaseColumnNames.get(i);
       if (hbaseColName.endsWith(":")) {
@@ -211,21 +239,22 @@ public class HBaseSerDe implements SerDe
       }
     }
   }
-  
+
   /**
-   * Deserialize a row from the HBase RowResult writable to a LazyObject
-   * @param rowResult the HBase RowResult Writable contain a row
+   * Deserialize a row from the HBase Result writable to a LazyObject
+   * @param result the HBase Result Writable containing the row
    * @return the deserialized object
-   * @see SerDe#deserialize(Writable) 
+   * @see SerDe#deserialize(Writable)
    */
-  public Object deserialize(Writable rowResult) throws SerDeException {
-    
-    if (!(rowResult instanceof RowResult)) {
-      throw new SerDeException(getClass().getName() + ": expects RowResult!");
-    }
-    
-    RowResult rr = (RowResult)rowResult;
-    cachedHBaseRow.init(rr, hbaseColumnNames);
+  @Override
+  public Object deserialize(Writable result) throws SerDeException {
+
+    if (!(result instanceof Result)) {
+      throw new SerDeException(getClass().getName() + ": expects Result!");
+    }
+
+    Result r = (Result)result;
+    cachedHBaseRow.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     return cachedHBaseRow;
   }
 
@@ -236,15 +265,15 @@ public class HBaseSerDe implements SerDe
 
   @Override
   public Class<? extends Writable> getSerializedClass() {
-    return BatchUpdate.class;
+    return Put.class;
   }
 
   @Override
   public Writable serialize(Object obj, ObjectInspector objInspector)
       throws SerDeException {
     if (objInspector.getCategory() != Category.STRUCT) {
-      throw new SerDeException(getClass().toString() 
-          + " can only serialize struct types, but we got: " 
+      throw new SerDeException(getClass().toString()
+          + " can only serialize struct types, but we got: "
           + objInspector.getTypeName());
     }
 
@@ -253,45 +282,48 @@ public class HBaseSerDe implements SerDe
     List<? extends StructField> fields = soi.getAllStructFieldRefs();
     List<Object> list = soi.getStructFieldsDataAsList(obj);
     List<? extends StructField> declaredFields =
-      (serdeParams.getRowTypeInfo() != null && 
+      (serdeParams.getRowTypeInfo() != null &&
         ((StructTypeInfo) serdeParams.getRowTypeInfo())
-        .getAllStructFieldNames().size() > 0) ? 
+        .getAllStructFieldNames().size() > 0) ?
       ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs()
       : null;
-        
-    BatchUpdate batchUpdate;
+
+    Put put = null;
 
     try {
-      byte [] key =
-        serializeField(
-          iKey, HBASE_KEY_COL, null, fields, list, declaredFields);
+      byte [] key = serializeField(iKey, null, fields, list, declaredFields);
+
       if (key == null) {
         throw new SerDeException("HBase row key cannot be NULL");
       }
-      batchUpdate = new BatchUpdate(key);
+
+      put = new Put(key);
+
       // Serialize each field
       for (int i = 0; i < fields.size(); i++) {
         if (i == iKey) {
           // already processed the key above
           continue;
         }
-        String hbaseColumn = hbaseColumnNames.get(i);
-        serializeField(
-          i, hbaseColumn, batchUpdate, fields, list, declaredFields);
+        serializeField(i, put, fields, list, declaredFields);
       }
     } catch (IOException e) {
       throw new SerDeException(e);
     }
-    
-    return batchUpdate;
+
+    return put;
   }
 
   private byte [] serializeField(
-    int i, String hbaseColumn, BatchUpdate batchUpdate,
+    int i,
+    Put put,
     List<? extends StructField> fields,
     List<Object> list,
     List<? extends StructField> declaredFields) throws IOException {
 
+    // column name
+    String hbaseColumn = hbaseColumnNames.get(i);
+
     // Get the field objectInspector and the field object.
     ObjectInspector foi = fields.get(i).getFieldObjectInspector();
     Object f = (list == null ? null : list.get(i));
@@ -300,7 +332,7 @@ public class HBaseSerDe implements SerDe
       // a null object, we do not serialize it
       return null;
     }
-        
+
     // If the field corresponds to a column family in hbase
     if (hbaseColumn.endsWith(":")) {
       MapObjectInspector moi = (MapObjectInspector)foi;
@@ -314,107 +346,73 @@ public class HBaseSerDe implements SerDe
         for (Map.Entry<?, ?> entry: map.entrySet()) {
           // Get the Key
           serializeStream.reset();
-          serialize(serializeStream, entry.getKey(), koi, 
-            serdeParams.getSeparators(), 3,
-            serdeParams.getNullSequence(),
-            serdeParams.isEscaped(),
-            serdeParams.getEscapeChar(),
-            serdeParams.getNeedsEscape());
-              
-          // generate a column name (column_family:column_name)
-          String hbaseSparseColumn =
-            hbaseColumn + Bytes.toString(
-              serializeStream.getData(), 0, serializeStream.getCount());
+          serialize(entry.getKey(), koi, 3);
+
+          // Get the column-qualifier
+          byte [] columnQualifier = new byte[serializeStream.getCount()];
+          System.arraycopy(serializeStream.getData(), 0, columnQualifier, 0, serializeStream.getCount());
 
           // Get the Value
           serializeStream.reset();
-
-          boolean isNotNull = serialize(serializeStream, entry.getValue(), voi, 
-            serdeParams.getSeparators(), 3,
-            serdeParams.getNullSequence(),
-            serdeParams.isEscaped(),
-            serdeParams.getEscapeChar(),
-            serdeParams.getNeedsEscape());
+          boolean isNotNull = serialize(entry.getValue(), voi, 3);
           if (!isNotNull) {
             continue;
           }
-          byte [] key = new byte[serializeStream.getCount()];
-          System.arraycopy(
-            serializeStream.getData(), 0, key, 0, serializeStream.getCount());
-          batchUpdate.put(hbaseSparseColumn, key);
+          byte [] value = new byte[serializeStream.getCount()];
+          System.arraycopy(serializeStream.getData(), 0, value, 0, serializeStream.getCount());
+          put.add(hbaseColumnNamesBytes.get(i), columnQualifier, value);
         }
       }
     } else {
-      // If the field that is passed in is NOT a primitive, and either the 
-      // field is not declared (no schema was given at initialization), or 
-      // the field is declared as a primitive in initialization, serialize 
-      // the data to JSON string.  Otherwise serialize the data in the 
+      // If the field that is passed in is NOT a primitive, and either the
+      // field is not declared (no schema was given at initialization), or
+      // the field is declared as a primitive in initialization, serialize
+      // the data to JSON string.  Otherwise serialize the data in the
       // delimited way.
       serializeStream.reset();
       boolean isNotNull;
       if (!foi.getCategory().equals(Category.PRIMITIVE)
-        && (declaredFields == null || 
-          declaredFields.get(i).getFieldObjectInspector().getCategory()
-          .equals(Category.PRIMITIVE) || useJSONSerialize)) {
+          && (declaredFields == null ||
+              declaredFields.get(i).getFieldObjectInspector().getCategory()
+              .equals(Category.PRIMITIVE) || useJSONSerialize)) {
+
         isNotNull = serialize(
-          serializeStream, SerDeUtils.getJSONString(f, foi),
-          PrimitiveObjectInspectorFactory.javaStringObjectInspector,
-          serdeParams.getSeparators(), 1,
-          serdeParams.getNullSequence(),
-          serdeParams.isEscaped(),
-          serdeParams.getEscapeChar(),
-          serdeParams.getNeedsEscape());
+            SerDeUtils.getJSONString(f, foi),
+            PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+            1);
       } else {
-        isNotNull = serialize(
-          serializeStream, f, foi, 
-          serdeParams.getSeparators(), 1,
-          serdeParams.getNullSequence(),
-          serdeParams.isEscaped(),
-          serdeParams.getEscapeChar(),
-          serdeParams.getNeedsEscape());
+        isNotNull = serialize(f, foi, 1);
       }
       if (!isNotNull) {
         return null;
       }
       byte [] key = new byte[serializeStream.getCount()];
-      System.arraycopy(
-        serializeStream.getData(), 0, key, 0, serializeStream.getCount());
-      if (hbaseColumn.equals(HBASE_KEY_COL)) {
+      System.arraycopy(serializeStream.getData(), 0, key, 0, serializeStream.getCount());
+      if (i == iKey) {
         return key;
       }
-      batchUpdate.put(hbaseColumn, key);
+      put.add(hbaseColumnNamesBytes.get(i), 0, key);
     }
 
     return null;
   }
-  
+
   /**
    * Serialize the row into a ByteStream.
    *
-   * @param out  The ByteStream.Output to store the serialized data.
-   * @param obj The object for the current field.
+   * @param obj           The object for the current field.
    * @param objInspector  The ObjectInspector for the current Object.
-   * @param separators    The separators array.
    * @param level         The current level of separator.
-   * @param nullSequence  The byte sequence representing the NULL value.
-   * @param escaped       Whether we need to escape the data when writing out
-   * @param escapeChar    Which char to use as the escape char, e.g. '\\'     
-   * @param needsEscape   Which chars needs to be escaped.
-   *                      This array should have size of 128.
-   *                      Negative byte values (or byte values >= 128)
-   *                      are never escaped.
-   * @throws IOException 
-   * @return true, if serialize a not-null object; otherwise false.
+   * @throws IOException
+   * @return true, if serialize is a not-null object; otherwise false.
    */
-  public static boolean serialize(ByteStream.Output out, Object obj, 
-    ObjectInspector objInspector, byte[] separators, int level,
-    Text nullSequence, boolean escaped, byte escapeChar,
-    boolean[] needsEscape) throws IOException {
-    
+  private boolean serialize(Object obj, ObjectInspector objInspector, int level)
+      throws IOException {
+
     switch (objInspector.getCategory()) {
       case PRIMITIVE: {
         LazyUtils.writePrimitiveUTF8(
-          out, obj,
+          serializeStream, obj,
           (PrimitiveObjectInspector) objInspector,
           escaped, escapeChar, needsEscape);
         return true;
@@ -429,10 +427,9 @@ public class HBaseSerDe implements SerDe
         } else {
           for (int i = 0; i < list.size(); i++) {
             if (i > 0) {
-              out.write(separator);
+              serializeStream.write(separator);
             }
-            serialize(out, list.get(i), eoi, separators, level + 1,
-                nullSequence, escaped, escapeChar, needsEscape);
+            serialize(list.get(i), eoi, level + 1);
           }
         }
         return true;
@@ -443,7 +440,7 @@ public class HBaseSerDe implements SerDe
         MapObjectInspector moi = (MapObjectInspector) objInspector;
         ObjectInspector koi = moi.getMapKeyObjectInspector();
         ObjectInspector voi = moi.getMapValueObjectInspector();
-        
+
         Map<?, ?> map = moi.getMap(obj);
         if (map == null) {
           return false;
@@ -453,13 +450,11 @@ public class HBaseSerDe implements SerDe
             if (first) {
               first = false;
             } else {
-              out.write(separator);
+              serializeStream.write(separator);
             }
-            serialize(out, entry.getKey(), koi, separators, level+2, 
-                nullSequence, escaped, escapeChar, needsEscape);
-            out.write(keyValueSeparator);
-            serialize(out, entry.getValue(), voi, separators, level+2, 
-                nullSequence, escaped, escapeChar, needsEscape);
+            serialize(entry.getKey(), koi, level+2);
+            serializeStream.write(keyValueSeparator);
+            serialize(entry.getValue(), voi, level+2);
           }
         }
         return true;
@@ -474,22 +469,19 @@ public class HBaseSerDe implements SerDe
         } else {
           for (int i = 0; i < list.size(); i++) {
             if (i > 0) {
-              out.write(separator);
+              serializeStream.write(separator);
             }
-            serialize(out, list.get(i),
-                fields.get(i).getFieldObjectInspector(), separators, level + 1,
-                nullSequence, escaped, escapeChar, needsEscape);
+            serialize(list.get(i), fields.get(i).getFieldObjectInspector(), level + 1);
           }
         }
         return true;
       }
     }
-    
-    throw new RuntimeException("Unknown category type: "
-        + objInspector.getCategory());
+
+    throw new RuntimeException("Unknown category type: " + objInspector.getCategory());
   }
-    
-  
+
+
   /**
    * @return the useJSONSerialize
    */
@@ -503,5 +495,4 @@ public class HBaseSerDe implements SerDe
   public void setUseJSONSerialize(boolean useJSONSerialize) {
     this.useJSONSerialize = useJSONSerialize;
   }
-
 }

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java Thu Jul 22 19:13:56 2010
@@ -23,53 +23,44 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.hbase.mapred.TableSplit;
 
 /**
  * HBaseSplit augments FileSplit with HBase column mapping.
  */
 public class HBaseSplit extends FileSplit implements InputSplit {
-  private String hbaseColumnMapping;
-  private TableSplit split;
-    
+  private final TableSplit split;
+
   public HBaseSplit() {
     super((Path) null, 0, 0, (String[]) null);
-    hbaseColumnMapping = "";
     split = new TableSplit();
   }
-    
-  public HBaseSplit(TableSplit split, String columnsMapping, Path dummyPath) {
+
+  public HBaseSplit(TableSplit split, Path dummyPath) {
     super(dummyPath, 0, 0, (String[]) null);
     this.split = split;
-    hbaseColumnMapping = columnsMapping;
   }
-    
+
   public TableSplit getSplit() {
     return this.split;
   }
-    
-  public String getColumnsMapping() {
-    return this.hbaseColumnMapping;
-  }
-    
+
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    hbaseColumnMapping = in.readUTF();
     split.readFields(in);
   }
 
   @Override
   public String toString() {
-    return "TableSplit " + split + " : " + hbaseColumnMapping;
+    return "TableSplit " + split;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    out.writeUTF(hbaseColumnMapping);
     split.write(out);
   }
 

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Thu Jul 22 19:13:56 2010
@@ -26,20 +26,17 @@ import java.util.Properties;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Constants;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -57,7 +54,7 @@ public class HBaseStorageHandler
 
   private HBaseConfiguration hbaseConf;
   private HBaseAdmin admin;
-  
+
   private HBaseAdmin getHBaseAdmin() throws MetaException {
     try {
       if (admin == null) {
@@ -88,12 +85,12 @@ public class HBaseStorageHandler
   public void preDropTable(Table table) throws MetaException {
     // nothing to do
   }
-  
+
   @Override
   public void rollbackDropTable(Table table) throws MetaException {
     // nothing to do
   }
-  
+
   @Override
   public void commitDropTable(
     Table tbl, boolean deleteData) throws MetaException {
@@ -130,12 +127,12 @@ public class HBaseStorageHandler
       // Check the hbase columns and get all the families
       Map<String, String> serdeParam =
         tbl.getSd().getSerdeInfo().getParameters();
-      String hbaseColumnStr = serdeParam.get(HBaseSerDe.HBASE_COL_MAPPING);
-      if (hbaseColumnStr == null) {
+      String hbaseColumnsMapping = serdeParam.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+      if (hbaseColumnsMapping == null) {
         throw new MetaException("No hbase.columns.mapping defined in Serde.");
       }
       List<String> hbaseColumns =
-        HBaseSerDe.parseColumnMapping(hbaseColumnStr);
+        HBaseSerDe.parseColumnMapping(hbaseColumnsMapping);
       int iKeyFirst = hbaseColumns.indexOf(HBaseSerDe.HBASE_KEY_COL);
       int iKeyLast = hbaseColumns.lastIndexOf(HBaseSerDe.HBASE_KEY_COL);
       if (iKeyFirst != iKeyLast) {
@@ -152,10 +149,10 @@ public class HBaseStorageHandler
         }
         columnFamilies.add(hbaseColumn.substring(0, idx));
       }
-  
+
       // Check if the given hbase table exists
       HTableDescriptor tblDesc;
-      
+
       if (!getHBaseAdmin().tableExists(tblName)) {
         // if it is not an external table then create one
         if (!isExternal) {
@@ -164,14 +161,14 @@ public class HBaseStorageHandler
           for (String cf : columnFamilies) {
             tblDesc.addFamily(new HColumnDescriptor(cf + ":"));
           }
-  
+
           getHBaseAdmin().createTable(tblDesc);
         } else {
           // an external table
-          throw new MetaException("HBase table " + tblName + 
+          throw new MetaException("HBase table " + tblName +
               " doesn't exist while the table is declared as an external table.");
         }
-      
+
       } else {
         if (!isExternal) {
           throw new MetaException("Table " + tblName + " already exists"
@@ -233,7 +230,7 @@ public class HBaseStorageHandler
   public Class<? extends InputFormat> getInputFormatClass() {
     return HiveHBaseTableInputFormat.class;
   }
-  
+
   @Override
   public Class<? extends OutputFormat> getOutputFormatClass() {
     return HiveHBaseTableOutputFormat.class;
@@ -255,10 +252,10 @@ public class HBaseStorageHandler
     Map<String, String> jobProperties) {
 
     Properties tableProperties = tableDesc.getProperties();
-    
+
     jobProperties.put(
-      HBaseSerDe.HBASE_COL_MAPPING,
-      tableProperties.getProperty(HBaseSerDe.HBASE_COL_MAPPING));
+      HBaseSerDe.HBASE_COLUMNS_MAPPING,
+      tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING));
 
     String tableName =
       tableProperties.getProperty(HBaseSerDe.HBASE_TABLE_NAME);

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Thu Jul 22 19:13:56 2010
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.hbase;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -28,57 +27,51 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 /**
  * HiveHBaseTableInputFormat implements InputFormat for HBase storage handler
  * tables, decorating an underlying HBase TableInputFormat with extra Hive logic
  * such as column pruning.
  */
-public class HiveHBaseTableInputFormat<K extends ImmutableBytesWritable, V extends RowResult>
-    implements InputFormat<K, V>, JobConfigurable {
-  
-  static final Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class);
-  
-  private HBaseExposedTableInputFormat hbaseInputFormat;
+public class HiveHBaseTableInputFormat extends TableInputFormatBase
+    implements InputFormat<ImmutableBytesWritable, Result> {
 
-  public HiveHBaseTableInputFormat() {
-    hbaseInputFormat = new HBaseExposedTableInputFormat();
-  }
+  static final Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class);
 
   @Override
-  public RecordReader<K, V> getRecordReader(
-    InputSplit split, JobConf job,
-    Reporter reporter) throws IOException {
+  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+    InputSplit split,
+    JobConf jobConf,
+    final Reporter reporter) throws IOException {
 
     HBaseSplit hbaseSplit = (HBaseSplit) split;
+    String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
+    setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
+
+    String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+    List<String> columns = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping);
+    List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
 
-    byte [] tableNameBytes;
-    String hbaseTableName = job.get(HBaseSerDe.HBASE_TABLE_NAME);
-    hbaseInputFormat.setHBaseTable(
-      new HTable(
-        new HBaseConfiguration(job),
-        Bytes.toBytes(hbaseTableName)));
-    
-    String columnMapping = hbaseSplit.getColumnsMapping();
-    List<String> columns = HBaseSerDe.parseColumnMapping(columnMapping);
-    List<Integer> readColIDs =
-      ColumnProjectionUtils.getReadColumnIDs(job);
- 
     if (columns.size() < readColIDs.size()) {
-      throw new IOException(
-        "Cannot read more columns than the given table contains.");
+      throw new IOException("Cannot read more columns than the given table contains.");
     }
 
     List<byte []> scanColumns = new ArrayList<byte []>();
@@ -103,28 +96,92 @@ public class HiveHBaseTableInputFormat<K
         }
       }
     }
-    
-    hbaseInputFormat.setScanColumns(scanColumns.toArray(new byte[0][]));
-    
-    return (RecordReader<K, V>)
-      hbaseInputFormat.getRecordReader(hbaseSplit.getSplit(), job, reporter);
+
+    setScan(new Scan().addColumns(scanColumns.toArray(new byte[0][])));
+    org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit = hbaseSplit.getSplit();
+
+    Job job = new Job(jobConf);
+    TaskAttemptContext tac =
+      new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()) {
+
+        @Override
+        public void progress() {
+          reporter.progress();
+        }
+      };
+
+    final org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable, Result>
+    recordReader = createRecordReader(tableSplit, tac);
+
+    return new RecordReader<ImmutableBytesWritable, Result>() {
+
+      @Override
+      public void close() throws IOException {
+        recordReader.close();
+      }
+
+      @Override
+      public ImmutableBytesWritable createKey() {
+        return new ImmutableBytesWritable();
+      }
+
+      @Override
+      public Result createValue() {
+        return new Result();
+      }
+
+      @Override
+      public long getPos() throws IOException {
+        return 0;
+      }
+
+      @Override
+      public float getProgress() throws IOException {
+        float progress = 0.0F;
+
+        try {
+          progress = recordReader.getProgress();
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        }
+
+        return progress;
+      }
+
+      @Override
+      public boolean next(ImmutableBytesWritable rowKey, Result value) throws IOException {
+
+        boolean next = false;
+
+        try {
+          next = recordReader.nextKeyValue();
+
+          if (next) {
+            rowKey.set(recordReader.getCurrentValue().getRow());
+            Writables.copyWritable(recordReader.getCurrentValue(), value);
+          }
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        }
+
+        return next;
+      }
+    };
   }
 
   @Override
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    Path [] tableNames = FileInputFormat.getInputPaths(job);
-    String hbaseTableName = job.get(HBaseSerDe.HBASE_TABLE_NAME);
-    hbaseInputFormat.setHBaseTable(
-      new HTable(new HBaseConfiguration(job), hbaseTableName));
-    
-    String hbaseSchemaMapping = job.get(HBaseSerDe.HBASE_COL_MAPPING);
-    if (hbaseSchemaMapping == null) {
+  public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
+
+    String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
+    setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
+    String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+    if (hbaseColumnsMapping == null) {
       throw new IOException("hbase.columns.mapping required for HBase Table.");
     }
 
     // REVIEW:  are we supposed to be applying the getReadColumnIDs
     // same as in getRecordReader?
-    List<String> columns = HBaseSerDe.parseColumnMapping(hbaseSchemaMapping);
+    List<String> columns = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping);
     List<byte []> inputColumns = new ArrayList<byte []>();
     for (String column : columns) {
       if (HBaseSerDe.isSpecialColumn(column)) {
@@ -132,43 +189,18 @@ public class HiveHBaseTableInputFormat<K
       }
       inputColumns.add(Bytes.toBytes(column));
     }
-    
-    hbaseInputFormat.setScanColumns(inputColumns.toArray(new byte[0][]));
-    
-    InputSplit[] splits = hbaseInputFormat.getSplits(
-      job, numSplits <= 0 ? 1 : numSplits);
-    InputSplit[] results = new InputSplit[splits.length];
-    for (int i = 0; i < splits.length; i++) {
-      results[i] = new HBaseSplit(
-        (TableSplit) splits[i], hbaseSchemaMapping, tableNames[0]);
-    }
-    return results;
-  }
- 
-  @Override
-  public void configure(JobConf job) {
-    hbaseInputFormat.configure(job);
-  }
 
-  /**
-   * HBaseExposedTableInputFormat exposes some protected methods
-   * from the HBase TableInputFormatBase.
-   */
-  static class HBaseExposedTableInputFormat
-    extends org.apache.hadoop.hbase.mapred.TableInputFormatBase
-    implements JobConfigurable {
-
-    @Override
-    public void configure(JobConf job) {
-      // not needed for now
-    }
-    
-    public void setScanColumns(byte[][] scanColumns) {
-      setInputColumns(scanColumns);
-    }
-    
-    public void setHBaseTable(HTable table) {
-      setHTable(table);
+    setScan(new Scan().addColumns(inputColumns.toArray(new byte[0][])));
+    Job job = new Job(jobConf);
+    JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID());
+    Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
+    List<org.apache.hadoop.mapreduce.InputSplit> splits = getSplits(jobContext);
+    InputSplit [] results = new InputSplit[splits.size()];
+
+    for (int i = 0; i < splits.size(); i++) {
+      results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]);
     }
+
+    return results;
   }
 }

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java Thu Jul 22 19:13:56 2010
@@ -21,92 +21,108 @@ package org.apache.hadoop.hive.hbase;
 import java.io.IOException;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.BatchOperation;
-import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.util.Progressable;
 
 /**
  * HiveHBaseTableOutputFormat implements HiveOutputFormat for HBase tables.
+ * We also need to implement the @deprecated org.apache.hadoop.mapred.OutFormat<?,?>
+ * class to keep it compliant with Hive interfaces.
  */
-public class HiveHBaseTableOutputFormat extends 
-    TableOutputFormat implements
-    HiveOutputFormat<ImmutableBytesWritable, BatchUpdate> {
-  
-  private final ImmutableBytesWritable key = new ImmutableBytesWritable();
+public class HiveHBaseTableOutputFormat extends
+    TableOutputFormat<ImmutableBytesWritable> implements
+    HiveOutputFormat<ImmutableBytesWritable, Put>,
+    OutputFormat<ImmutableBytesWritable, Put> {
+
+  static final Log LOG = LogFactory.getLog(HiveHBaseTableOutputFormat.class);
+  public static final String HBASE_WAL_ENABLED = "hive.hbase.wal.enabled";
 
   /**
-   * Update to the final out table, and output an empty key as the key.
-   * 
-   * @param jc
-   *          the job configuration file
-   * @param finalOutPath
-   *          the final output table name
-   * @param valueClass
-   *          the value class used for create
-   * @param isCompressed
-   *          whether the content is compressed or not
-   * @param tableProperties
-   *          the tableInfo of this file's corresponding table
-   * @param progress
-   *          progress used for status report
+   * Update the out table, and output an empty key as the key.
+   *
+   * @param jc the job configuration file
+   * @param finalOutPath the final output table name
+   * @param valueClass the value class
+   * @param isCompressed whether the content is compressed or not
+   * @param tableProperties the table info of the corresponding table
+   * @param progress progress used for status report
    * @return the RecordWriter for the output file
    */
   @Override
-  public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
-      Class<? extends Writable> valueClass, boolean isCompressed,
-      Properties tableProperties, Progressable progress) throws IOException {
+  public RecordWriter getHiveRecordWriter(
+      JobConf jc,
+      Path finalOutPath,
+      Class<? extends Writable> valueClass,
+      boolean isCompressed,
+      Properties tableProperties,
+      final Progressable progressable) throws IOException {
+
     String hbaseTableName = jc.get(HBaseSerDe.HBASE_TABLE_NAME);
     jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
-
-    boolean walEnabled = HiveConf.getBoolVar(
-      jc, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED);
-    
-    HTable table = new HTable(new HBaseConfiguration(jc), hbaseTableName);
+    final boolean walEnabled = HiveConf.getBoolVar(
+        jc, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED);
+    final HTable table = new HTable(new HBaseConfiguration(jc), hbaseTableName);
     table.setAutoFlush(false);
-    return new HiveHBaseRecordWriter(table, walEnabled);
-  }
 
-  // This class was cloned from the HBase RecordWriter so that we
-  // can control the WAL setting.
-  private static class HiveHBaseRecordWriter implements RecordWriter {
-    private HTable table;
-    private boolean walEnabled;
-
-    HiveHBaseRecordWriter(HTable table, boolean walEnabled) {
-      this.table = table;
-      this.walEnabled = walEnabled;
-    }
+    return new RecordWriter() {
 
-    @Override
-    public void close(boolean abort) throws IOException {
-      if (!abort) {
-        table.flushCommits();
+      @Override
+      public void close(boolean abort) throws IOException {
+        if (!abort) {
+          table.flushCommits();
+        }
       }
-    }
 
-    @Override
-    public void write(Writable w) throws IOException {
-      BatchUpdate batchUpdate = (BatchUpdate) w;
-      for (BatchOperation bo : batchUpdate) {
-        assert(bo.isPut());
-        Put p = new Put(batchUpdate.getRow(), null);
-        if (!walEnabled) {
-          p.setWriteToWAL(false);
-        }
-        p.add(bo.getColumn(), batchUpdate.getTimestamp(), bo.getValue());
-        table.put(p);
+      @Override
+      public void write(Writable w) throws IOException {
+        Put put = (Put) w;
+        put.setWriteToWAL(walEnabled);
+        table.put(put);
       }
+    };
+  }
+
+  @Override
+  public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException {
+
+    String hbaseTableName = jc.get(HBaseSerDe.HBASE_TABLE_NAME);
+    jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
+    Job job = new Job(jc);
+    JobContext jobContext =
+      new JobContext(job.getConfiguration(), job.getJobID());
+
+    try {
+      checkOutputSpecs(jobContext);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
     }
   }
+
+  @Override
+  public
+  org.apache.hadoop.mapred.RecordWriter<ImmutableBytesWritable, Put>
+  getRecordWriter(
+      FileSystem fileSystem,
+      JobConf jobConf,
+      String name,
+      Progressable progressable) throws IOException {
+
+    throw new RuntimeException("Error: Hive should not invoke this method.");
+  }
 }

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java Thu Jul 22 19:13:56 2010
@@ -18,24 +18,21 @@
 
 package org.apache.hadoop.hive.hbase;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Map;
+import java.util.Properties;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.io.Text;
@@ -45,9 +42,6 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -61,13 +55,13 @@ public class HiveHFileOutputFormat exten
 
   private static final String HFILE_FAMILY_PATH = "hfile.family.path";
 
-  private static final Log LOG = LogFactory.getLog(
+  static final Log LOG = LogFactory.getLog(
     HiveHFileOutputFormat.class.getName());
 
-  private org.apache.hadoop.mapreduce.RecordWriter<
-    ImmutableBytesWritable, KeyValue> getFileWriter(
-      org.apache.hadoop.mapreduce.TaskAttemptContext tac) throws IOException
-  {
+  private
+  org.apache.hadoop.mapreduce.RecordWriter<ImmutableBytesWritable, KeyValue>
+  getFileWriter(org.apache.hadoop.mapreduce.TaskAttemptContext tac)
+  throws IOException {
     try {
       return super.getRecordWriter(tac);
     } catch (InterruptedException ex) {
@@ -77,8 +71,10 @@ public class HiveHFileOutputFormat exten
 
   @Override
   public RecordWriter getHiveRecordWriter(
-    final JobConf jc, final Path finalOutPath,
-    Class<? extends Writable> valueClass, boolean isCompressed,
+    final JobConf jc,
+    final Path finalOutPath,
+    Class<? extends Writable> valueClass,
+    boolean isCompressed,
     Properties tableProperties,
     final Progressable progressable) throws IOException {
 
@@ -209,5 +205,4 @@ public class HiveHFileOutputFormat exten
       }
     };
   }
-
 }

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java Thu Jul 22 19:13:56 2010
@@ -18,29 +18,30 @@
 
 package org.apache.hadoop.hive.hbase;
 
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Map.Entry;
 
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
 import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
 import org.apache.hadoop.hive.serde2.lazy.LazyMap;
 import org.apache.hadoop.hive.serde2.lazy.LazyObject;
 import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive;
-import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
 import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.io.Writable;
 
 /**
  * LazyHBaseCellMap refines LazyMap with HBase column mapping.
  */
 public class LazyHBaseCellMap extends LazyMap {
-  
-  private RowResult rowResult;
-  private String hbaseColumnFamily;
-  
+
+  private Result result;
+  private byte [] columnFamilyBytes;
+
   /**
    * Construct a LazyCellMap object with the ObjectInspector.
    * @param oi
@@ -49,76 +50,68 @@ public class LazyHBaseCellMap extends La
     super(oi);
   }
 
-  @Override
-  public void init(ByteArrayRef bytes, int start, int length) {
-    // do nothing
-  }
-  
-  public void init(RowResult rr, String columnFamily) {
-    rowResult = rr;
-    hbaseColumnFamily = columnFamily;
+  public void init(Result r, byte [] columnFamilyBytes) {
+    result = r;
+    this.columnFamilyBytes = columnFamilyBytes;
     setParsed(false);
   }
-  
+
   private void parse() {
     if (cachedMap == null) {
       cachedMap = new LinkedHashMap<Object, Object>();
     } else {
       cachedMap.clear();
     }
-    
-    Iterator<byte[]> iter = rowResult.keySet().iterator();
-    
-    byte[] columnFamily = hbaseColumnFamily.getBytes();
-    while (iter.hasNext()) {
-      byte [] columnKey = iter.next();
-      if (columnFamily.length > columnKey.length) {
-        continue;
-      }
-      
-      if (0 == LazyUtils.compare(
-          columnFamily, 0, columnFamily.length, 
-          columnKey, 0, columnFamily.length)) {
-
-        byte [] columnValue = rowResult.get(columnKey).getValue();
-        if (columnValue == null || columnValue.length == 0) {
-          // an empty object
+
+    NavigableMap<byte [], byte []> familyMap = result.getFamilyMap(columnFamilyBytes);
+
+    if (familyMap != null) {
+
+      for (Entry<byte [], byte []> e : familyMap.entrySet()) {
+        // null values and values of zero length are not added to the cachedMap
+        if (e.getValue() == null || e.getValue().length == 0) {
           continue;
         }
-        
+
         // Keys are always primitive
-        LazyPrimitive<?, ?> key = LazyFactory.createLazyPrimitiveClass(
-            (PrimitiveObjectInspector)
-            ((MapObjectInspector) getInspector()).getMapKeyObjectInspector());
+        LazyPrimitive<? extends ObjectInspector, ? extends Writable> key =
+          LazyFactory.createLazyPrimitiveClass(
+              (PrimitiveObjectInspector) getInspector().getMapKeyObjectInspector());
+
         ByteArrayRef keyRef = new ByteArrayRef();
-        keyRef.setData(columnKey);
-        key.init(
-          keyRef, columnFamily.length, columnKey.length - columnFamily.length);
-        
+        keyRef.setData(e.getKey());
+        key.init(keyRef, 0, keyRef.getData().length);
+
         // Value
-        LazyObject value = LazyFactory.createLazyObject(
-          ((MapObjectInspector) getInspector()).getMapValueObjectInspector());
+        LazyObject<?> value =
+          LazyFactory.createLazyObject(
+              getInspector().getMapValueObjectInspector());
+
         ByteArrayRef valueRef = new ByteArrayRef();
-        valueRef.setData(columnValue);
-        value.init(valueRef, 0, columnValue.length);
-        
-        // Put it into the map
+        valueRef.setData(e.getValue());
+        value.init(valueRef, 0, valueRef.getData().length);
+
+        // Put the key/value into the map
         cachedMap.put(key.getObject(), value.getObject());
       }
     }
+
+    setParsed(true);
   }
-  
+
+
   /**
    * Get the value in the map for the given key.
-   * 
+   *
    * @param key
    * @return
    */
+  @Override
   public Object getMapValueElement(Object key) {
     if (!getParsed()) {
       parse();
     }
-    
+
     for (Map.Entry<Object, Object> entry : cachedMap.entrySet()) {
       LazyPrimitive<?, ?> lazyKeyI = (LazyPrimitive<?, ?>) entry.getKey();
       // getWritableObject() will convert LazyPrimitive to actual primitive
@@ -129,26 +122,27 @@ public class LazyHBaseCellMap extends La
       }
       if (keyI.equals(key)) {
         // Got a match, return the value
-        LazyObject v = (LazyObject) entry.getValue();
+        LazyObject<?> v = (LazyObject<?>) entry.getValue();
         return v == null ? v : v.getObject();
       }
     }
-    
+
     return null;
   }
-  
+
+  @Override
   public Map<Object, Object> getMap() {
     if (!getParsed()) {
       parse();
     }
     return cachedMap;
   }
-  
+
+  @Override
   public int getMapSize() {
     if (!getParsed()) {
       parse();
     }
     return cachedMap.size();
   }
-
 }

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java Thu Jul 22 19:13:56 2010
@@ -15,13 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hive.hbase;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
 import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
 import org.apache.hadoop.hive.serde2.lazy.LazyObject;
@@ -36,33 +37,35 @@ import org.apache.hadoop.hive.serde2.obj
  * primitive or non-primitive.
  */
 public class LazyHBaseRow extends LazyStruct {
-  
+
   /**
    * The HBase columns mapping of the row.
    */
+  private Result result;
   private List<String> hbaseColumns;
-  private RowResult rowResult;
+  private List<byte []> hbaseColumnsBytes;
   private ArrayList<Object> cachedList;
-  
+
   /**
    * Construct a LazyHBaseRow object with the ObjectInspector.
    */
   public LazyHBaseRow(LazySimpleStructObjectInspector oi) {
     super(oi);
   }
-  
+
   /**
-   * Set the hbase row data(a RowResult writable) for this LazyStruct.
-   * @see LazyHBaseRow#init(RowResult)
+   * Set the HBase row data(a Result writable) for this LazyStruct.
+   * @see LazyHBaseRow#init(Result)
    */
-  public void init(RowResult rr, List<String> hbaseColumns) {
-    this.rowResult = rr;
+  public void init(Result r, List<String> hbaseColumns, List<byte []> hbaseColumnsBytes) {
+    result = r;
     this.hbaseColumns = hbaseColumns;
+    this.hbaseColumnsBytes = hbaseColumnsBytes;
     setParsed(false);
   }
 
   /**
-   * Parse the RowResult and fill each field.
+   * Parse the Result and fill each field.
    * @see LazyStruct#parse()
    */
   private void parse() {
@@ -74,13 +77,13 @@ public class LazyHBaseRow extends LazySt
         String hbaseColumn = hbaseColumns.get(i);
         if (hbaseColumn.endsWith(":")) {
           // a column family
-          getFields()[i] = 
+          getFields()[i] =
             new LazyHBaseCellMap(
               (LazyMapObjectInspector)
               fieldRefs.get(i).getFieldObjectInspector());
           continue;
         }
-        
+
         getFields()[i] = LazyFactory.createLazyObject(
           fieldRefs.get(i).getFieldObjectInspector());
       }
@@ -89,26 +92,27 @@ public class LazyHBaseRow extends LazySt
     Arrays.fill(getFieldInited(), false);
     setParsed(true);
   }
-  
+
   /**
-   * Get one field out of the hbase row.
-   * 
+   * Get one field out of the HBase row.
+   *
    * If the field is a primitive field, return the actual object.
    * Otherwise return the LazyObject.  This is because PrimitiveObjectInspector
    * does not have control over the object used by the user - the user simply
-   * directly uses the Object instead of going through 
-   * Object PrimitiveObjectInspector.get(Object).  
-   * 
+   * directly uses the Object instead of going through
+   * Object PrimitiveObjectInspector.get(Object).
+   *
    * @param fieldID  The field ID
    * @return         The field as a LazyObject
    */
+  @Override
   public Object getField(int fieldID) {
     if (!getParsed()) {
       parse();
     }
     return uncheckedGetField(fieldID);
   }
-  
+
   /**
    * Get the field out of the row without checking whether parsing is needed.
    * This is called by both getField and getFieldsAsList.
@@ -119,32 +123,34 @@ public class LazyHBaseRow extends LazySt
   private Object uncheckedGetField(int fieldID) {
     if (!getFieldInited()[fieldID]) {
       getFieldInited()[fieldID] = true;
-      
       ByteArrayRef ref = null;
-      
       String columnName = hbaseColumns.get(fieldID);
+      byte [] columnNameBytes = hbaseColumnsBytes.get(fieldID);
+
       if (columnName.equals(HBaseSerDe.HBASE_KEY_COL)) {
         ref = new ByteArrayRef();
-        ref.setData(rowResult.getRow());
+        ref.setData(result.getRow());
       } else {
         if (columnName.endsWith(":")) {
           // it is a column family
-          ((LazyHBaseCellMap) getFields()[fieldID]).init(
-            rowResult, columnName);
+          ((LazyHBaseCellMap) getFields()[fieldID]).init(result, columnNameBytes);
         } else {
           // it is a column
-          if (rowResult.containsKey(columnName)) {
-            ref = new ByteArrayRef();
-            ref.setData(rowResult.get(columnName).getValue());
-          } else {
+          byte [] res = result.getValue(columnNameBytes);
+          if (res == null) {
             return null;
+          } else {
+            ref = new ByteArrayRef();
+            ref.setData(res);
           }
         }
       }
+
       if (ref != null) {
         getFields()[fieldID].init(ref, 0, ref.getData().length);
       }
     }
+
     return getFields()[fieldID].getObject();
   }
 
@@ -152,6 +158,7 @@ public class LazyHBaseRow extends LazySt
    * Get the values of the fields as an ArrayList.
    * @return The values of the fields as an ArrayList.
    */
+  @Override
   public ArrayList<Object> getFieldsAsList() {
     if (!getParsed()) {
       parse();
@@ -166,10 +173,9 @@ public class LazyHBaseRow extends LazySt
     }
     return cachedList;
   }
-  
+
   @Override
   public Object getObject() {
     return this;
   }
-
 }

Modified: hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java Thu Jul 22 19:13:56 2010
@@ -37,8 +37,8 @@ import org.apache.zookeeper.Watcher;
  * HBaseTestSetup defines HBase-specific test fixtures which are
  * reused across testcases.
  */
-public class HBaseTestSetup extends TestSetup
-{
+public class HBaseTestSetup extends TestSetup {
+
   private MiniHBaseCluster hbaseCluster;
   private MiniZooKeeperCluster zooKeeperCluster;
   private int zooKeeperPort;
@@ -49,7 +49,7 @@ public class HBaseTestSetup extends Test
   public HBaseTestSetup(Test test) {
     super(test);
   }
-  
+
   void preTest(HiveConf conf) throws Exception {
     if (hbaseCluster == null) {
       // We set up fixtures on demand for the first testcase, and leave

Modified: hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Thu Jul 22 19:13:56 2010
@@ -15,17 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hive.hbase;
 
+package org.apache.hadoop.hive.hbase;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
+import junit.framework.TestCase;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
@@ -38,8 +41,6 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
-import junit.framework.TestCase;
-
 /**
  * Tests the HBaseSerDe class.
  */
@@ -54,32 +55,39 @@ public class TestHBaseSerDe extends Test
     Configuration conf = new Configuration();
     Properties tbl = createProperties();
     serDe.initialize(conf, tbl);
-      
+
     byte[] colabyte   = "cola:abyte".getBytes();
     byte[] colbshort  = "colb:ashort".getBytes();
     byte[] colcint    = "colc:aint".getBytes();
     byte[] colalong   = "cola:along".getBytes();
     byte[] colbdouble = "colb:adouble".getBytes();
     byte[] colcstring = "colc:astring".getBytes();
-      
+
     // Data
-    HbaseMapWritable<byte[], Cell> cells =
-      new HbaseMapWritable<byte[], Cell>();
-    cells.put(colabyte,    new Cell("123".getBytes(), 0));
-    cells.put(colbshort,   new Cell("456".getBytes(), 0));
-    cells.put(colcint,     new Cell("789".getBytes(), 0));
-    cells.put(colalong,    new Cell("1000".getBytes(), 0));
-    cells.put(colbdouble,  new Cell("5.3".getBytes(), 0));
-    cells.put(colcstring,  new Cell("hive and hadoop".getBytes(), 0));
-    RowResult rr = new RowResult("test-row1".getBytes(), cells);
-    BatchUpdate bu = new BatchUpdate("test-row1".getBytes());
-    bu.put(colabyte,    "123".getBytes());
-    bu.put(colbshort,   "456".getBytes());
-    bu.put(colcint,     "789".getBytes());
-    bu.put(colalong,    "1000".getBytes());
-    bu.put(colbdouble,  "5.3".getBytes());
-    bu.put(colcstring,  "hive and hadoop".getBytes());
-      
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
+        colabyte, 0, Bytes.toBytes("123")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
+        colbshort, 0, Bytes.toBytes("456")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
+        colcint, 0, Bytes.toBytes("789")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
+        colalong, 0, Bytes.toBytes("1000")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
+        colbdouble, 0, Bytes.toBytes("5.3")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
+        colcstring, 0, Bytes.toBytes("hive and hadoop")));
+    Result r = new Result(kvs);
+
+    Put p = new Put(Bytes.toBytes("test-row1"));
+
+    p.add(colabyte, 0, Bytes.toBytes("123"));
+    p.add(colbshort, 0, Bytes.toBytes("456"));
+    p.add(colcint, 0, Bytes.toBytes("789"));
+    p.add(colalong, 0, Bytes.toBytes("1000"));
+    p.add(colbdouble, 0, Bytes.toBytes("5.3"));
+    p.add(colcstring, 0, Bytes.toBytes("hive and hadoop"));
+
     Object[] expectedFieldsData = {
       new Text("test-row1"),
       new ByteWritable((byte)123),
@@ -89,12 +97,12 @@ public class TestHBaseSerDe extends Test
       new DoubleWritable(5.3),
       new Text("hive and hadoop")
     };
-     
-    deserializeAndSerialize(serDe, rr, bu, expectedFieldsData);
+
+    deserializeAndSerialize(serDe, r, p, expectedFieldsData);
   }
 
   private void deserializeAndSerialize(
-    HBaseSerDe serDe, RowResult rr, BatchUpdate bu,
+      HBaseSerDe serDe, Result r, Put p,
       Object[] expectedFieldsData) throws SerDeException {
 
     // Get the row structure
@@ -102,34 +110,33 @@ public class TestHBaseSerDe extends Test
       serDe.getObjectInspector();
     List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
     assertEquals(7, fieldRefs.size());
-    
+
     // Deserialize
-    Object row = serDe.deserialize(rr);
+    Object row = serDe.deserialize(r);
     for (int i = 0; i < fieldRefs.size(); i++) {
       Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
       if (fieldData != null) {
-        fieldData = ((LazyPrimitive)fieldData).getWritableObject();
+        fieldData = ((LazyPrimitive<?, ?>)fieldData).getWritableObject();
       }
       assertEquals("Field " + i, expectedFieldsData[i], fieldData);
     }
-    // Serialize 
-    assertEquals(BatchUpdate.class, serDe.getSerializedClass());
-    BatchUpdate serializedBU = (BatchUpdate)serDe.serialize(row, oi);
-    assertEquals("Serialized data", bu.toString(), serializedBU.toString());
+    // Serialize
+    assertEquals(Put.class, serDe.getSerializedClass());
+    Put serializedPut = (Put) serDe.serialize(row, oi);
+    assertEquals("Serialized data", p.toString(), serializedPut.toString());
   }
 
   private Properties createProperties() {
     Properties tbl = new Properties();
-    
+
     // Set the configuration parameters
     tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
     tbl.setProperty("columns",
         "key,abyte,ashort,aint,along,adouble,astring");
     tbl.setProperty("columns.types",
         "string,tinyint:smallint:int:bigint:double:string");
-    tbl.setProperty(HBaseSerDe.HBASE_COL_MAPPING, 
+    tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
         "cola:abyte,colb:ashort,colc:aint,cola:along,colb:adouble,colc:astring");
     return tbl;
   }
-  
 }

Modified: hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java Thu Jul 22 19:13:56 2010
@@ -15,14 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hive.hbase;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
-import org.apache.hadoop.hbase.io.RowResult;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
 import org.apache.hadoop.hive.serde2.lazy.LazyString;
@@ -34,8 +38,6 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 
-import junit.framework.TestCase;
-
 /**
  * TestLazyHBaseObject is a test for the LazyHBaseXXX classes.
  */
@@ -47,44 +49,56 @@ public class TestLazyHBaseObject extends
     // Map of Integer to String
     Text nullSequence = new Text("\\N");
     ObjectInspector oi = LazyFactory.createLazyObjectInspector(
-      TypeInfoUtils.getTypeInfosFromTypeString("map<int,string>").get(0), 
+      TypeInfoUtils.getTypeInfosFromTypeString("map<int,string>").get(0),
       new byte[]{(byte)1, (byte)2}, 0, nullSequence, false, (byte)0);
-        
+
     LazyHBaseCellMap b = new LazyHBaseCellMap((LazyMapObjectInspector) oi);
-        
-    // Intialize a row result
-    HbaseMapWritable<byte[], Cell> cells = new HbaseMapWritable<byte[], Cell>();
-    cells.put("cfa:col1".getBytes(), new Cell("cfacol1".getBytes(), 0));
-    cells.put("cfa:col2".getBytes(), new Cell("cfacol2".getBytes(), 0));
-    cells.put("cfb:2".getBytes(),    new Cell("def".getBytes(), 0));
-    cells.put("cfb:-1".getBytes(),   new Cell("".getBytes(), 0));
-    cells.put("cfb:0".getBytes(),    new Cell("0".getBytes(), 0));
-    cells.put("cfb:8".getBytes(),    new Cell("abc".getBytes(), 0));
-    cells.put("cfc:col3".getBytes(), new Cell("cfccol3".getBytes(), 0));
-        
-    RowResult rr = new RowResult("test-row".getBytes(), cells);
-        
-    b.init(rr, "cfb:");
-        
+
+    // Initialize a result
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa:col1"), 0, Bytes.toBytes("cfacol1")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa:col2"), 0, Bytes.toBytes("cfacol2")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb:2"), 0, Bytes.toBytes("def")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb:-1"), 0, Bytes.toBytes("")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb:0"), 0, Bytes.toBytes("0")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb:8"), 0, Bytes.toBytes("abc")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfc:col3"), 0, Bytes.toBytes("cfccol3")));
+
+    Result r = new Result(kvs);
+
+    b.init(r, "cfb".getBytes());
+
     assertEquals(
       new Text("def"),
       ((LazyString)b.getMapValueElement(
         new IntWritable(2))).getWritableObject());
+
     assertNull(b.getMapValueElement(new IntWritable(-1)));
+
     assertEquals(
       new Text("0"),
       ((LazyString)b.getMapValueElement(
         new IntWritable(0))).getWritableObject());
+
     assertEquals(
       new Text("abc"),
       ((LazyString)b.getMapValueElement(
         new IntWritable(8))).getWritableObject());
+
     assertNull(b.getMapValueElement(new IntWritable(12345)));
-        
+
     assertEquals("{0:'0',2:'def',8:'abc'}".replace('\'', '\"'),
       SerDeUtils.getJSONString(b, oi));
   }
-      
+
   /**
    * Test the LazyMap class with String-to-String.
    */
@@ -92,139 +106,171 @@ public class TestLazyHBaseObject extends
     // Map of String to String
     Text nullSequence = new Text("\\N");
     ObjectInspector oi = LazyFactory.createLazyObjectInspector(
-      TypeInfoUtils.getTypeInfosFromTypeString("map<string,string>").get(0), 
+      TypeInfoUtils.getTypeInfosFromTypeString("map<string,string>").get(0),
       new byte[]{(byte)'#', (byte)'\t'}, 0, nullSequence, false, (byte)0);
-        
+
     LazyHBaseCellMap b = new LazyHBaseCellMap((LazyMapObjectInspector) oi);
-        
-    // Intialize a row result
-    HbaseMapWritable<byte[], Cell> cells =
-      new HbaseMapWritable<byte[], Cell>();
-    cells.put("cfa:col1".getBytes(), new Cell("cfacol1".getBytes(), 0));
-    cells.put("cfa:col2".getBytes(), new Cell("cfacol2".getBytes(), 0));
-    cells.put("cfb:2".getBytes(),    new Cell("d\tf".getBytes(), 0));
-    cells.put("cfb:-1".getBytes(),   new Cell("".getBytes(), 0));
-    cells.put("cfb:0".getBytes(),    new Cell("0".getBytes(), 0));
-    cells.put("cfb:8".getBytes(),    new Cell("abc".getBytes(), 0));
-    cells.put("cfc:col3".getBytes(), new Cell("cfccol3".getBytes(), 0));
-        
-    RowResult rr = new RowResult("test-row".getBytes(), cells);
-        
-    b.init(rr, "cfb:");
-        
+
+    // Initialize a result
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("col1"), 0, Bytes.toBytes("cfacol1")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("col2"), 0, Bytes.toBytes("cfacol2")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("2"), 0, Bytes.toBytes("d\tf")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("-1"), 0, Bytes.toBytes("")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("0"), 0, Bytes.toBytes("0")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("8"), 0, Bytes.toBytes("abc")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfc"), Bytes.toBytes("col3"), 0, Bytes.toBytes("cfccol3")));
+
+    Result r = new Result(kvs);
+    b.init(r, "cfb".getBytes());
+
     assertEquals(
       new Text("d\tf"),
       ((LazyString)b.getMapValueElement(
         new Text("2"))).getWritableObject());
+
     assertNull(b.getMapValueElement(new Text("-1")));
+
     assertEquals(
       new Text("0"),
       ((LazyString)b.getMapValueElement(
         new Text("0"))).getWritableObject());
+
     assertEquals(
       new Text("abc"),
       ((LazyString)b.getMapValueElement(
         new Text("8"))).getWritableObject());
+
     assertNull(b.getMapValueElement(new Text("-")));
-        
+
     assertEquals(
       "{'0':'0','2':'d\\tf','8':'abc'}".replace('\'', '\"'),
       SerDeUtils.getJSONString(b, oi));
   }
-  
+
   /**
    * Test the LazyHBaseRow class with one-for-one mappings between
    * Hive fields and HBase columns.
    */
   public void testLazyHBaseRow1() {
-    List<TypeInfo> fieldTypeInfos = 
+    List<TypeInfo> fieldTypeInfos =
       TypeInfoUtils.getTypeInfosFromTypeString(
         "string,int,array<string>,map<string,string>,string");
     List<String> fieldNames = Arrays.asList(
       new String[]{"key", "a", "b", "c", "d"});
     Text nullSequence = new Text("\\N");
-        
-    List<String> hbaseColumnNames = 
+
+    List<String> hbaseColumnNames =
       Arrays.asList(new String[]{":key", "cfa:a", "cfa:b", "cfb:c", "cfb:d"});
-        
+    List<byte []> hbaseColumnNamesBytes = HBaseSerDe.initColumnNamesBytes(hbaseColumnNames);
+
     ObjectInspector oi = LazyFactory.createLazyStructInspector(fieldNames,
       fieldTypeInfos, new byte[] {' ', ':', '='},
       nullSequence, false, false, (byte)0);
     LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi);
 
-    HbaseMapWritable<byte[], Cell> cells =
-      new HbaseMapWritable<byte[], Cell>();
-        
-    cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0));
-    cells.put("cfa:b".getBytes(), new Cell("a:b:c".getBytes(), 0));
-    cells.put("cfb:c".getBytes(), new Cell("d=e:f=g".getBytes(), 0));
-    cells.put("cfb:d".getBytes(), new Cell("hi".getBytes(), 0));
-    RowResult rr = new RowResult("test-row".getBytes(), cells);
-    o.init(rr, hbaseColumnNames);
-    assertEquals(
-      ("{'key':'test-row','a':123,'b':['a','b','c'],"
-        + "'c':{'d':'e','f':'g'},'d':'hi'}").replace("'", "\""), 
-      SerDeUtils.getJSONString(o, oi));
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
 
-    cells.clear();
-    cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0));
-    cells.put("cfb:c".getBytes(), new Cell("d=e:f=g".getBytes(), 0));
-    rr = new RowResult("test-row".getBytes(), cells);
-    o.init(rr, hbaseColumnNames);
-    assertEquals(
-      ("{'key':'test-row','a':123,'b':null,"
-        + "'c':{'d':'e','f':'g'},'d':null}").replace("'", "\""),
-      SerDeUtils.getJSONString(o, oi));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("a:b:c")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("c"), 0, Bytes.toBytes("d=e:f=g")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("hi")));
+
+    Result r = new Result(kvs);
 
-    cells.clear();
-    cells.put("cfa:b".getBytes(), new Cell("a".getBytes(), 0));
-    cells.put("cfb:c".getBytes(), new Cell("d=\\N:f=g:h".getBytes(), 0));
-    cells.put("cfb:d".getBytes(), new Cell("no".getBytes(), 0));
-    rr = new RowResult("test-row".getBytes(), cells);
-    o.init(rr, hbaseColumnNames);
+    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
-      ("{'key':'test-row','a':null,'b':['a'],"
-        + "'c':{'d':null,'f':'g','h':null},'d':'no'}").replace("'", "\""),
+      ("{'key':'test-row','a':123,'b':['a','b','c'],"
+        + "'c':{'d':'e','f':'g'},'d':'hi'}").replace("'", "\""),
       SerDeUtils.getJSONString(o, oi));
 
-    cells.clear();
-    cells.put("cfa:b".getBytes(), new Cell(":a::".getBytes(), 0));
-    cells.put("cfb:d".getBytes(), new Cell("no".getBytes(), 0));
-    rr = new RowResult("test-row".getBytes(), cells);
-    o.init(rr, hbaseColumnNames);
+    kvs.clear();
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("c"), 0, Bytes.toBytes("d=e:f=g")));
+    r = new Result(kvs);
+
+    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
+    assertEquals(
+        ("{'key':'test-row','a':123,'b':null,"
+          + "'c':{'d':'e','f':'g'},'d':null}").replace("'", "\""),
+        SerDeUtils.getJSONString(o, oi));
+
+    kvs.clear();
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("a")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("c"), 0, Bytes.toBytes("d=\\N:f=g:h")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("no")));
+    r = new Result(kvs);
+
+    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
+    assertEquals(
+        ("{'key':'test-row','a':null,'b':['a'],"
+          + "'c':{'d':null,'f':'g','h':null},'d':'no'}").replace("'", "\""),
+        SerDeUtils.getJSONString(o, oi));
+
+    kvs.clear();
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes(":a::")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("no")));
+    r = new Result(kvs);
+
+    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       ("{'key':'test-row','a':null,'b':['','a','',''],"
         + "'c':null,'d':'no'}").replace("'", "\""),
       SerDeUtils.getJSONString(o, oi));
 
-    cells.clear();
-    cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0));
-    cells.put("cfa:b".getBytes(), new Cell("".getBytes(), 0));
-    cells.put("cfb:c".getBytes(), new Cell("".getBytes(), 0));
-    cells.put("cfb:d".getBytes(), new Cell("".getBytes(), 0));
-    rr = new RowResult("test-row".getBytes(), cells);
-    o.init(rr, hbaseColumnNames);
+    kvs.clear();
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("c"), 0, Bytes.toBytes("")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("")));
+    r = new Result(kvs);
+
+    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       "{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""),
       SerDeUtils.getJSONString(o, oi));
   }
-      
+
   /**
    * Test the LazyHBaseRow class with a mapping from a Hive field to
    * an HBase column family.
    */
   public void testLazyHBaseRow2() {
     // column family is mapped to Map<string,string>
-    List<TypeInfo> fieldTypeInfos = 
+    List<TypeInfo> fieldTypeInfos =
       TypeInfoUtils.getTypeInfosFromTypeString(
         "string,int,array<string>,map<string,string>,string");
     List<String> fieldNames = Arrays.asList(
       new String[]{"key", "a", "b", "c", "d"});
     Text nullSequence = new Text("\\N");
-        
-    List<String> hbaseColumnNames = 
+
+    List<String> hbaseColumnNames =
       Arrays.asList(new String[]{":key", "cfa:a", "cfa:b", "cfb:", "cfc:d"});
-        
+    List<byte []> hbaseColumnNamesBytes = HBaseSerDe.initColumnNamesBytes(hbaseColumnNames);
+
     ObjectInspector oi = LazyFactory.createLazyStructInspector(
       fieldNames,
       fieldTypeInfos,
@@ -232,59 +278,79 @@ public class TestLazyHBaseObject extends
       nullSequence, false, false, (byte) 0);
     LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi);
 
-    HbaseMapWritable<byte[], Cell> cells =
-      new HbaseMapWritable<byte[], Cell>();
-        
-    cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0));
-    cells.put("cfa:b".getBytes(), new Cell("a:b:c".getBytes(), 0));
-    cells.put("cfb:d".getBytes(), new Cell("e".getBytes(), 0));
-    cells.put("cfb:f".getBytes(), new Cell("g".getBytes(), 0));
-    cells.put("cfc:d".getBytes(), new Cell("hi".getBytes(), 0));
-    RowResult rr = new RowResult("test-row".getBytes(), cells);
-    o.init(rr, hbaseColumnNames);
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("a:b:c")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("e")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("f"), 0, Bytes.toBytes("g")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfc"), Bytes.toBytes("d"), 0, Bytes.toBytes("hi")));
+
+    Result r = new Result(kvs);
+
+    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       ("{'key':'test-row','a':123,'b':['a','b','c'],"
-        + "'c':{'d':'e','f':'g'},'d':'hi'}").replace("'", "\""), 
+        + "'c':{'d':'e','f':'g'},'d':'hi'}").replace("'", "\""),
       SerDeUtils.getJSONString(o, oi));
 
-    cells.clear();
-    cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0));
-    cells.put("cfb:d".getBytes(), new Cell("e".getBytes(), 0));
-    cells.put("cfb:f".getBytes(), new Cell("g".getBytes(), 0));
-    rr = new RowResult("test-row".getBytes(), cells);
-    o.init(rr, hbaseColumnNames);
+    kvs.clear();
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("e")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("f"), 0, Bytes.toBytes("g")));
+    r = new Result(kvs);
+
+    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       ("{'key':'test-row','a':123,'b':null,"
         + "'c':{'d':'e','f':'g'},'d':null}").replace("'", "\""),
       SerDeUtils.getJSONString(o, oi));
 
-    cells.clear();
-    cells.put("cfa:b".getBytes(), new Cell("a".getBytes(), 0));
-    cells.put("cfb:f".getBytes(), new Cell("g".getBytes(), 0));
-    cells.put("cfc:d".getBytes(), new Cell("no".getBytes(), 0));
-    rr = new RowResult("test-row".getBytes(), cells);
-    o.init(rr, hbaseColumnNames);
+    kvs.clear();
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("a")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfb"), Bytes.toBytes("f"), 0, Bytes.toBytes("g")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfc"), Bytes.toBytes("d"), 0, Bytes.toBytes("no")));
+    r = new Result(kvs);
+
+    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       ("{'key':'test-row','a':null,'b':['a'],"
         + "'c':{'f':'g'},'d':'no'}").replace("'", "\""),
       SerDeUtils.getJSONString(o, oi));
 
-    cells.clear();
-    cells.put("cfa:b".getBytes(), new Cell(":a::".getBytes(), 0));
-    cells.put("cfc:d".getBytes(), new Cell("no".getBytes(), 0));
-    rr = new RowResult("test-row".getBytes(), cells);
-    o.init(rr, hbaseColumnNames);
+    kvs.clear();
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes(":a::")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfc"), Bytes.toBytes("d"), 0, Bytes.toBytes("no")));
+    r = new Result(kvs);
+
+    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       ("{'key':'test-row','a':null,'b':['','a','',''],"
         + "'c':{},'d':'no'}").replace("'", "\""),
       SerDeUtils.getJSONString(o, oi));
 
-    cells.clear();
-    cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0));
-    cells.put("cfa:b".getBytes(), new Cell("".getBytes(), 0));
-    cells.put("cfc:d".getBytes(), new Cell("".getBytes(), 0));
-    rr = new RowResult("test-row".getBytes(), cells);
-    o.init(rr, hbaseColumnNames);
+    kvs.clear();
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+        Bytes.toBytes("cfc"), Bytes.toBytes("d"), 0, Bytes.toBytes("")));
+    r = new Result(kvs);
+
+    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       "{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""),
       SerDeUtils.getJSONString(o, oi));