You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2012/02/25 15:25:24 UTC
svn commit: r1293616 - in /hive/trunk/hbase-handler/src:
java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
Author: hashutosh
Date: Sat Feb 25 14:25:24 2012
New Revision: 1293616
URL: http://svn.apache.org/viewvc?rev=1293616&view=rev
Log:
HIVE-2781: HBaseSerDe should allow users to specify the timestamp passed to Puts (toffer via hashutosh)
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=1293616&r1=1293615&r2=1293616&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Sat Feb 25 14:25:24 2012
@@ -63,6 +63,7 @@ public class HBaseSerDe implements SerDe
public static final String HBASE_COLUMNS_MAPPING = "hbase.columns.mapping";
public static final String HBASE_TABLE_NAME = "hbase.table.name";
public static final String HBASE_KEY_COL = ":key";
+ public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp";
public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
private ObjectInspector cachedObjectInspector;
@@ -76,6 +77,7 @@ public class HBaseSerDe implements SerDe
private LazyHBaseRow cachedHBaseRow;
private final ByteStream.Output serializeStream = new ByteStream.Output();
private int iKey;
+ private long putTimestamp;
// used for serializing a field
private byte [] separators; // the separators array
@@ -253,6 +255,7 @@ public class HBaseSerDe implements SerDe
// Read configuration parameters
hbaseColumnsMapping = tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
+ putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1"));
// Parse the HBase columns mapping and initialize the col family & qualifiers
hbaseColumnFamilies = new ArrayList<String>();
@@ -383,7 +386,10 @@ public class HBaseSerDe implements SerDe
throw new SerDeException("HBase row key cannot be NULL");
}
- put = new Put(key);
+ if(putTimestamp >= 0)
+ put = new Put(key,putTimestamp);
+ else
+ put = new Put(key);
// Serialize each field
for (int i = 0; i < fields.size(); i++) {
Modified: hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=1293616&r1=1293615&r2=1293616&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java (original)
+++ hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Sat Feb 25 14:25:24 2012
@@ -115,6 +115,73 @@ public class TestHBaseSerDe extends Test
deserializeAndSerialize(serDe, r, p, expectedFieldsData);
}
+ public void testHBaseSerDeWithTimestamp() throws SerDeException {
+ // Create the SerDe
+ HBaseSerDe serDe = new HBaseSerDe();
+ Configuration conf = new Configuration();
+ Properties tbl = createProperties();
+ long putTimestamp = 1;
+ tbl.setProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,
+ Long.toString(putTimestamp));
+ serDe.initialize(conf, tbl);
+
+
+ byte [] cfa = "cola".getBytes();
+ byte [] cfb = "colb".getBytes();
+ byte [] cfc = "colc".getBytes();
+
+ byte [] qualByte = "byte".getBytes();
+ byte [] qualShort = "short".getBytes();
+ byte [] qualInt = "int".getBytes();
+ byte [] qualLong = "long".getBytes();
+ byte [] qualFloat = "float".getBytes();
+ byte [] qualDouble = "double".getBytes();
+ byte [] qualString = "string".getBytes();
+ byte [] qualBool = "boolean".getBytes();
+
+ byte [] rowKey = Bytes.toBytes("test-row1");
+
+ // Data
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+ kvs.add(new KeyValue(rowKey, cfa, qualByte, Bytes.toBytes("123")));
+ kvs.add(new KeyValue(rowKey, cfb, qualShort, Bytes.toBytes("456")));
+ kvs.add(new KeyValue(rowKey, cfc, qualInt, Bytes.toBytes("789")));
+ kvs.add(new KeyValue(rowKey, cfa, qualLong, Bytes.toBytes("1000")));
+ kvs.add(new KeyValue(rowKey, cfb, qualFloat, Bytes.toBytes("-0.01")));
+ kvs.add(new KeyValue(rowKey, cfc, qualDouble, Bytes.toBytes("5.3")));
+ kvs.add(new KeyValue(rowKey, cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive")));
+ kvs.add(new KeyValue(rowKey, cfb, qualBool, Bytes.toBytes("true")));
+ Collections.sort(kvs, KeyValue.COMPARATOR);
+
+ Result r = new Result(kvs);
+
+ Put p = new Put(rowKey,putTimestamp);
+
+ p.add(cfa, qualByte, Bytes.toBytes("123"));
+ p.add(cfb, qualShort, Bytes.toBytes("456"));
+ p.add(cfc, qualInt, Bytes.toBytes("789"));
+ p.add(cfa, qualLong, Bytes.toBytes("1000"));
+ p.add(cfb, qualFloat, Bytes.toBytes("-0.01"));
+ p.add(cfc, qualDouble, Bytes.toBytes("5.3"));
+ p.add(cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive"));
+ p.add(cfb, qualBool, Bytes.toBytes("true"));
+
+ Object[] expectedFieldsData = {
+ new Text("test-row1"),
+ new ByteWritable((byte)123),
+ new ShortWritable((short)456),
+ new IntWritable(789),
+ new LongWritable(1000),
+ new FloatWritable(-0.01F),
+ new DoubleWritable(5.3),
+ new Text("Hadoop, HBase, and Hive"),
+ new BooleanWritable(true)
+ };
+
+ deserializeAndSerialize(serDe, r, p, expectedFieldsData);
+ }
+
private void deserializeAndSerialize(
HBaseSerDe serDe, Result r, Put p,
Object[] expectedFieldsData) throws SerDeException {