You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2012/02/25 15:25:24 UTC

svn commit: r1293616 - in /hive/trunk/hbase-handler/src: java/org/apache/hadoop/hive/hbase/HBaseSerDe.java test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java

Author: hashutosh
Date: Sat Feb 25 14:25:24 2012
New Revision: 1293616

URL: http://svn.apache.org/viewvc?rev=1293616&view=rev
Log:
HIVE-2781: HBaseSerDe should allow users to specify the timestamp passed to Puts (toffer via hashutosh)

Modified:
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
    hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=1293616&r1=1293615&r2=1293616&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Sat Feb 25 14:25:24 2012
@@ -63,6 +63,7 @@ public class HBaseSerDe implements SerDe
   public static final String HBASE_COLUMNS_MAPPING = "hbase.columns.mapping";
   public static final String HBASE_TABLE_NAME = "hbase.table.name";
   public static final String HBASE_KEY_COL = ":key";
+  public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp";
   public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
 
   private ObjectInspector cachedObjectInspector;
@@ -76,6 +77,7 @@ public class HBaseSerDe implements SerDe
   private LazyHBaseRow cachedHBaseRow;
   private final ByteStream.Output serializeStream = new ByteStream.Output();
   private int iKey;
+  private long putTimestamp;
 
   // used for serializing a field
   private byte [] separators;     // the separators array
@@ -253,6 +255,7 @@ public class HBaseSerDe implements SerDe
     // Read configuration parameters
     hbaseColumnsMapping = tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
     String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
+    putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1"));
 
     // Parse the HBase columns mapping and initialize the col family & qualifiers
     hbaseColumnFamilies = new ArrayList<String>();
@@ -383,7 +386,10 @@ public class HBaseSerDe implements SerDe
         throw new SerDeException("HBase row key cannot be NULL");
       }
 
-      put = new Put(key);
+      if(putTimestamp >= 0)
+        put = new Put(key,putTimestamp);
+      else
+        put = new Put(key);
 
       // Serialize each field
       for (int i = 0; i < fields.size(); i++) {

Modified: hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=1293616&r1=1293615&r2=1293616&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Sat Feb 25 14:25:24 2012
@@ -115,6 +115,73 @@ public class TestHBaseSerDe extends Test
     deserializeAndSerialize(serDe, r, p, expectedFieldsData);
   }
 
+  public void testHBaseSerDeWithTimestamp() throws SerDeException {
+    // Create the SerDe
+    HBaseSerDe serDe = new HBaseSerDe();
+    Configuration conf = new Configuration();
+    Properties tbl = createProperties();
+    long putTimestamp = 1;
+    tbl.setProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,
+            Long.toString(putTimestamp));
+    serDe.initialize(conf, tbl);
+
+
+    byte [] cfa = "cola".getBytes();
+    byte [] cfb = "colb".getBytes();
+    byte [] cfc = "colc".getBytes();
+
+    byte [] qualByte = "byte".getBytes();
+    byte [] qualShort = "short".getBytes();
+    byte [] qualInt = "int".getBytes();
+    byte [] qualLong = "long".getBytes();
+    byte [] qualFloat = "float".getBytes();
+    byte [] qualDouble = "double".getBytes();
+    byte [] qualString = "string".getBytes();
+    byte [] qualBool = "boolean".getBytes();
+
+    byte [] rowKey = Bytes.toBytes("test-row1");
+
+    // Data
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+    kvs.add(new KeyValue(rowKey, cfa, qualByte, Bytes.toBytes("123")));
+    kvs.add(new KeyValue(rowKey, cfb, qualShort, Bytes.toBytes("456")));
+    kvs.add(new KeyValue(rowKey, cfc, qualInt, Bytes.toBytes("789")));
+    kvs.add(new KeyValue(rowKey, cfa, qualLong, Bytes.toBytes("1000")));
+    kvs.add(new KeyValue(rowKey, cfb, qualFloat, Bytes.toBytes("-0.01")));
+    kvs.add(new KeyValue(rowKey, cfc, qualDouble, Bytes.toBytes("5.3")));
+    kvs.add(new KeyValue(rowKey, cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive")));
+    kvs.add(new KeyValue(rowKey, cfb, qualBool, Bytes.toBytes("true")));
+    Collections.sort(kvs, KeyValue.COMPARATOR);
+
+    Result r = new Result(kvs);
+
+    Put p = new Put(rowKey,putTimestamp);
+
+    p.add(cfa, qualByte, Bytes.toBytes("123"));
+    p.add(cfb, qualShort, Bytes.toBytes("456"));
+    p.add(cfc, qualInt, Bytes.toBytes("789"));
+    p.add(cfa, qualLong, Bytes.toBytes("1000"));
+    p.add(cfb, qualFloat, Bytes.toBytes("-0.01"));
+    p.add(cfc, qualDouble, Bytes.toBytes("5.3"));
+    p.add(cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive"));
+    p.add(cfb, qualBool, Bytes.toBytes("true"));
+
+    Object[] expectedFieldsData = {
+      new Text("test-row1"),
+      new ByteWritable((byte)123),
+      new ShortWritable((short)456),
+      new IntWritable(789),
+      new LongWritable(1000),
+      new FloatWritable(-0.01F),
+      new DoubleWritable(5.3),
+      new Text("Hadoop, HBase, and Hive"),
+      new BooleanWritable(true)
+    };
+
+    deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+  }
+
   private void deserializeAndSerialize(
       HBaseSerDe serDe, Result r, Put p,
       Object[] expectedFieldsData) throws SerDeException {