You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jv...@apache.org on 2010/07/22 21:13:56 UTC
svn commit: r966811 - in /hadoop/hive/trunk: ./
hbase-handler/src/java/org/apache/hadoop/hive/hbase/
hbase-handler/src/test/org/apache/hadoop/hive/hbase/
Author: jvs
Date: Thu Jul 22 19:13:56 2010
New Revision: 966811
URL: http://svn.apache.org/viewvc?rev=966811&view=rev
Log:
HIVE-1229. Replace dependencies on HBase deprecated API.
(Basab Maulik via jvs)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Jul 22 19:13:56 2010
@@ -31,13 +31,16 @@ Trunk - Unreleased
case
(Siying Dong via Ning Zhang)
- HIVE-1469. replace isArray() calls and remove LOG.isInfoEnabled() in
+ HIVE-1469. replace isArray() calls and remove LOG.isInfoEnabled() in
Operator.forward()
(Yongqiang He via Ning Zhang)
HIVE-1463. Hive output file names are unnecessarily large
(Joydeep Sen Sarma via Ning Zhang)
+ HIVE-1229. Replace dependencies on HBase deprecated API.
+ (Basab Maulik via jvs)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Thu Jul 22 19:13:56 2010
@@ -28,8 +28,8 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.ByteStream;
@@ -52,8 +52,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
/**
@@ -61,24 +59,29 @@ import org.apache.hadoop.io.Writable;
* deserialize objects from an HBase table.
*/
public class HBaseSerDe implements SerDe {
-
- public static final String HBASE_COL_MAPPING = "hbase.columns.mapping";
-
+
+ public static final String HBASE_COLUMNS_MAPPING = "hbase.columns.mapping";
public static final String HBASE_TABLE_NAME = "hbase.table.name";
-
public static final String HBASE_KEY_COL = ":key";
-
- public static final Log LOG = LogFactory.getLog(
- HBaseSerDe.class.getName());
+ public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
private ObjectInspector cachedObjectInspector;
private List<String> hbaseColumnNames;
+ private List<byte []> hbaseColumnNamesBytes;
private SerDeParameters serdeParams;
private boolean useJSONSerialize;
private LazyHBaseRow cachedHBaseRow;
- private ByteStream.Output serializeStream = new ByteStream.Output();
+ private final ByteStream.Output serializeStream = new ByteStream.Output();
private int iKey;
+ // used for serializing a field
+ private byte [] separators; // the separators array
+ private boolean escaped; // whether we need to escape the data when writing out
+ private byte escapeChar; // which char to use as the escape char, e.g. '\\'
+ private boolean [] needsEscape; // which chars need to be escaped. This array should have size
+ // of 128. Negative byte values (or byte values >= 128) are
+ // never escaped.
+ @Override
public String toString() {
return getClass().toString()
+ "["
@@ -90,32 +93,33 @@ public class HBaseSerDe implements SerDe
+ ((StructTypeInfo) serdeParams.getRowTypeInfo())
.getAllStructFieldTypeInfos() + "]";
}
-
+
public HBaseSerDe() throws SerDeException {
}
-
+
/**
* Initialize the SerDe given parameters.
* @see SerDe#initialize(Configuration, Properties)
*/
+ @Override
public void initialize(Configuration conf, Properties tbl)
throws SerDeException {
- initHBaseSerDeParameters(conf, tbl,
+ initHBaseSerDeParameters(conf, tbl,
getClass().getName());
-
+
cachedObjectInspector = LazyFactory.createLazyStructInspector(
- serdeParams.getColumnNames(),
- serdeParams.getColumnTypes(),
+ serdeParams.getColumnNames(),
+ serdeParams.getColumnTypes(),
serdeParams.getSeparators(),
serdeParams.getNullSequence(),
serdeParams.isLastColumnTakesRest(),
serdeParams.isEscaped(),
- serdeParams.getEscapeChar());
-
+ serdeParams.getEscapeChar());
+
cachedHBaseRow = new LazyHBaseRow(
(LazySimpleStructObjectInspector) cachedObjectInspector);
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("HBaseSerDe initialized with : columnNames = "
+ serdeParams.getColumnNames()
@@ -136,25 +140,45 @@ public class HBaseSerDe implements SerDe
}
return columnList;
}
-
+
+ public static List<byte []> initColumnNamesBytes(List<String> columnNames) {
+ List<byte []> columnBytes = new ArrayList<byte []>();
+ String column = null;
+
+ for (int i = 0; i < columnNames.size(); i++) {
+ column = columnNames.get(i);
+
+ if (column.endsWith(":")) {
+ columnBytes.add(Bytes.toBytes(column.split(":")[0]));
+ } else {
+ columnBytes.add(Bytes.toBytes(column));
+ }
+ }
+
+ return columnBytes;
+ }
+
public static boolean isSpecialColumn(String hbaseColumnName) {
return hbaseColumnName.equals(HBASE_KEY_COL);
}
-
+
private void initHBaseSerDeParameters(
- Configuration job, Properties tbl, String serdeName)
+ Configuration job, Properties tbl, String serdeName)
throws SerDeException {
// Read configuration parameters
String hbaseColumnNameProperty =
- tbl.getProperty(HBaseSerDe.HBASE_COL_MAPPING);
+ tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
String columnTypeProperty =
tbl.getProperty(Constants.LIST_COLUMN_TYPES);
-
- // Initialize the hbase column list
+
+ // Initialize the HBase column list
hbaseColumnNames = parseColumnMapping(hbaseColumnNameProperty);
iKey = hbaseColumnNames.indexOf(HBASE_KEY_COL);
-
+
+ // initialize the byte [] corresponding to each column name
+ hbaseColumnNamesBytes = initColumnNamesBytes(hbaseColumnNames);
+
// Build the type property string if not supplied
if (columnTypeProperty == null) {
StringBuilder sb = new StringBuilder();
@@ -180,20 +204,24 @@ public class HBaseSerDe implements SerDe
}
tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString());
}
-
- serdeParams = LazySimpleSerDe.initSerdeParams(
- job, tbl, serdeName);
-
+
+ serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
+
if (hbaseColumnNames.size() != serdeParams.getColumnNames().size()) {
- throw new SerDeException(serdeName + ": columns has " +
- serdeParams.getColumnNames().size() +
- " elements while hbase.columns.mapping has " +
+ throw new SerDeException(serdeName + ": columns has " +
+ serdeParams.getColumnNames().size() +
+ " elements while hbase.columns.mapping has " +
hbaseColumnNames.size() + " elements" +
" (counting the key if implicit)");
}
-
+
+ separators = serdeParams.getSeparators();
+ escaped = serdeParams.isEscaped();
+ escapeChar = serdeParams.getEscapeChar();
+ needsEscape = serdeParams.getNeedsEscape();
+
// check that the mapping schema is right;
- // we just can make sure that "columnfamily:" is mapped to MAP<String,?>
+ // we just can make sure that "column-family:" is mapped to MAP<String,?>
for (int i = 0; i < hbaseColumnNames.size(); i++) {
String hbaseColName = hbaseColumnNames.get(i);
if (hbaseColName.endsWith(":")) {
@@ -211,21 +239,22 @@ public class HBaseSerDe implements SerDe
}
}
}
-
+
/**
- * Deserialize a row from the HBase RowResult writable to a LazyObject
- * @param rowResult the HBase RowResult Writable contain a row
+ * Deserialize a row from the HBase Result writable to a LazyObject
+ * @param result the HBase Result Writable containing the row
* @return the deserialized object
- * @see SerDe#deserialize(Writable)
+ * @see SerDe#deserialize(Writable)
*/
- public Object deserialize(Writable rowResult) throws SerDeException {
-
- if (!(rowResult instanceof RowResult)) {
- throw new SerDeException(getClass().getName() + ": expects RowResult!");
- }
-
- RowResult rr = (RowResult)rowResult;
- cachedHBaseRow.init(rr, hbaseColumnNames);
+ @Override
+ public Object deserialize(Writable result) throws SerDeException {
+
+ if (!(result instanceof Result)) {
+ throw new SerDeException(getClass().getName() + ": expects Result!");
+ }
+
+ Result r = (Result)result;
+ cachedHBaseRow.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
return cachedHBaseRow;
}
@@ -236,15 +265,15 @@ public class HBaseSerDe implements SerDe
@Override
public Class<? extends Writable> getSerializedClass() {
- return BatchUpdate.class;
+ return Put.class;
}
@Override
public Writable serialize(Object obj, ObjectInspector objInspector)
throws SerDeException {
if (objInspector.getCategory() != Category.STRUCT) {
- throw new SerDeException(getClass().toString()
- + " can only serialize struct types, but we got: "
+ throw new SerDeException(getClass().toString()
+ + " can only serialize struct types, but we got: "
+ objInspector.getTypeName());
}
@@ -253,45 +282,48 @@ public class HBaseSerDe implements SerDe
List<? extends StructField> fields = soi.getAllStructFieldRefs();
List<Object> list = soi.getStructFieldsDataAsList(obj);
List<? extends StructField> declaredFields =
- (serdeParams.getRowTypeInfo() != null &&
+ (serdeParams.getRowTypeInfo() != null &&
((StructTypeInfo) serdeParams.getRowTypeInfo())
- .getAllStructFieldNames().size() > 0) ?
+ .getAllStructFieldNames().size() > 0) ?
((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs()
: null;
-
- BatchUpdate batchUpdate;
+
+ Put put = null;
try {
- byte [] key =
- serializeField(
- iKey, HBASE_KEY_COL, null, fields, list, declaredFields);
+ byte [] key = serializeField(iKey, null, fields, list, declaredFields);
+
if (key == null) {
throw new SerDeException("HBase row key cannot be NULL");
}
- batchUpdate = new BatchUpdate(key);
+
+ put = new Put(key);
+
// Serialize each field
for (int i = 0; i < fields.size(); i++) {
if (i == iKey) {
// already processed the key above
continue;
}
- String hbaseColumn = hbaseColumnNames.get(i);
- serializeField(
- i, hbaseColumn, batchUpdate, fields, list, declaredFields);
+ serializeField(i, put, fields, list, declaredFields);
}
} catch (IOException e) {
throw new SerDeException(e);
}
-
- return batchUpdate;
+
+ return put;
}
private byte [] serializeField(
- int i, String hbaseColumn, BatchUpdate batchUpdate,
+ int i,
+ Put put,
List<? extends StructField> fields,
List<Object> list,
List<? extends StructField> declaredFields) throws IOException {
+ // column name
+ String hbaseColumn = hbaseColumnNames.get(i);
+
// Get the field objectInspector and the field object.
ObjectInspector foi = fields.get(i).getFieldObjectInspector();
Object f = (list == null ? null : list.get(i));
@@ -300,7 +332,7 @@ public class HBaseSerDe implements SerDe
// a null object, we do not serialize it
return null;
}
-
+
// If the field corresponds to a column family in hbase
if (hbaseColumn.endsWith(":")) {
MapObjectInspector moi = (MapObjectInspector)foi;
@@ -314,107 +346,73 @@ public class HBaseSerDe implements SerDe
for (Map.Entry<?, ?> entry: map.entrySet()) {
// Get the Key
serializeStream.reset();
- serialize(serializeStream, entry.getKey(), koi,
- serdeParams.getSeparators(), 3,
- serdeParams.getNullSequence(),
- serdeParams.isEscaped(),
- serdeParams.getEscapeChar(),
- serdeParams.getNeedsEscape());
-
- // generate a column name (column_family:column_name)
- String hbaseSparseColumn =
- hbaseColumn + Bytes.toString(
- serializeStream.getData(), 0, serializeStream.getCount());
+ serialize(entry.getKey(), koi, 3);
+
+ // Get the column-qualifier
+ byte [] columnQualifier = new byte[serializeStream.getCount()];
+ System.arraycopy(serializeStream.getData(), 0, columnQualifier, 0, serializeStream.getCount());
// Get the Value
serializeStream.reset();
-
- boolean isNotNull = serialize(serializeStream, entry.getValue(), voi,
- serdeParams.getSeparators(), 3,
- serdeParams.getNullSequence(),
- serdeParams.isEscaped(),
- serdeParams.getEscapeChar(),
- serdeParams.getNeedsEscape());
+ boolean isNotNull = serialize(entry.getValue(), voi, 3);
if (!isNotNull) {
continue;
}
- byte [] key = new byte[serializeStream.getCount()];
- System.arraycopy(
- serializeStream.getData(), 0, key, 0, serializeStream.getCount());
- batchUpdate.put(hbaseSparseColumn, key);
+ byte [] value = new byte[serializeStream.getCount()];
+ System.arraycopy(serializeStream.getData(), 0, value, 0, serializeStream.getCount());
+ put.add(hbaseColumnNamesBytes.get(i), columnQualifier, value);
}
}
} else {
- // If the field that is passed in is NOT a primitive, and either the
- // field is not declared (no schema was given at initialization), or
- // the field is declared as a primitive in initialization, serialize
- // the data to JSON string. Otherwise serialize the data in the
+ // If the field that is passed in is NOT a primitive, and either the
+ // field is not declared (no schema was given at initialization), or
+ // the field is declared as a primitive in initialization, serialize
+ // the data to JSON string. Otherwise serialize the data in the
// delimited way.
serializeStream.reset();
boolean isNotNull;
if (!foi.getCategory().equals(Category.PRIMITIVE)
- && (declaredFields == null ||
- declaredFields.get(i).getFieldObjectInspector().getCategory()
- .equals(Category.PRIMITIVE) || useJSONSerialize)) {
+ && (declaredFields == null ||
+ declaredFields.get(i).getFieldObjectInspector().getCategory()
+ .equals(Category.PRIMITIVE) || useJSONSerialize)) {
+
isNotNull = serialize(
- serializeStream, SerDeUtils.getJSONString(f, foi),
- PrimitiveObjectInspectorFactory.javaStringObjectInspector,
- serdeParams.getSeparators(), 1,
- serdeParams.getNullSequence(),
- serdeParams.isEscaped(),
- serdeParams.getEscapeChar(),
- serdeParams.getNeedsEscape());
+ SerDeUtils.getJSONString(f, foi),
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ 1);
} else {
- isNotNull = serialize(
- serializeStream, f, foi,
- serdeParams.getSeparators(), 1,
- serdeParams.getNullSequence(),
- serdeParams.isEscaped(),
- serdeParams.getEscapeChar(),
- serdeParams.getNeedsEscape());
+ isNotNull = serialize(f, foi, 1);
}
if (!isNotNull) {
return null;
}
byte [] key = new byte[serializeStream.getCount()];
- System.arraycopy(
- serializeStream.getData(), 0, key, 0, serializeStream.getCount());
- if (hbaseColumn.equals(HBASE_KEY_COL)) {
+ System.arraycopy(serializeStream.getData(), 0, key, 0, serializeStream.getCount());
+ if (i == iKey) {
return key;
}
- batchUpdate.put(hbaseColumn, key);
+ put.add(hbaseColumnNamesBytes.get(i), 0, key);
}
return null;
}
-
+
/**
* Serialize the row into a ByteStream.
*
- * @param out The ByteStream.Output to store the serialized data.
- * @param obj The object for the current field.
+ * @param obj The object for the current field.
* @param objInspector The ObjectInspector for the current Object.
- * @param separators The separators array.
* @param level The current level of separator.
- * @param nullSequence The byte sequence representing the NULL value.
- * @param escaped Whether we need to escape the data when writing out
- * @param escapeChar Which char to use as the escape char, e.g. '\\'
- * @param needsEscape Which chars needs to be escaped.
- * This array should have size of 128.
- * Negative byte values (or byte values >= 128)
- * are never escaped.
- * @throws IOException
- * @return true, if serialize a not-null object; otherwise false.
+ * @throws IOException
+ * @return true, if serialize is a not-null object; otherwise false.
*/
- public static boolean serialize(ByteStream.Output out, Object obj,
- ObjectInspector objInspector, byte[] separators, int level,
- Text nullSequence, boolean escaped, byte escapeChar,
- boolean[] needsEscape) throws IOException {
-
+ private boolean serialize(Object obj, ObjectInspector objInspector, int level)
+ throws IOException {
+
switch (objInspector.getCategory()) {
case PRIMITIVE: {
LazyUtils.writePrimitiveUTF8(
- out, obj,
+ serializeStream, obj,
(PrimitiveObjectInspector) objInspector,
escaped, escapeChar, needsEscape);
return true;
@@ -429,10 +427,9 @@ public class HBaseSerDe implements SerDe
} else {
for (int i = 0; i < list.size(); i++) {
if (i > 0) {
- out.write(separator);
+ serializeStream.write(separator);
}
- serialize(out, list.get(i), eoi, separators, level + 1,
- nullSequence, escaped, escapeChar, needsEscape);
+ serialize(list.get(i), eoi, level + 1);
}
}
return true;
@@ -443,7 +440,7 @@ public class HBaseSerDe implements SerDe
MapObjectInspector moi = (MapObjectInspector) objInspector;
ObjectInspector koi = moi.getMapKeyObjectInspector();
ObjectInspector voi = moi.getMapValueObjectInspector();
-
+
Map<?, ?> map = moi.getMap(obj);
if (map == null) {
return false;
@@ -453,13 +450,11 @@ public class HBaseSerDe implements SerDe
if (first) {
first = false;
} else {
- out.write(separator);
+ serializeStream.write(separator);
}
- serialize(out, entry.getKey(), koi, separators, level+2,
- nullSequence, escaped, escapeChar, needsEscape);
- out.write(keyValueSeparator);
- serialize(out, entry.getValue(), voi, separators, level+2,
- nullSequence, escaped, escapeChar, needsEscape);
+ serialize(entry.getKey(), koi, level+2);
+ serializeStream.write(keyValueSeparator);
+ serialize(entry.getValue(), voi, level+2);
}
}
return true;
@@ -474,22 +469,19 @@ public class HBaseSerDe implements SerDe
} else {
for (int i = 0; i < list.size(); i++) {
if (i > 0) {
- out.write(separator);
+ serializeStream.write(separator);
}
- serialize(out, list.get(i),
- fields.get(i).getFieldObjectInspector(), separators, level + 1,
- nullSequence, escaped, escapeChar, needsEscape);
+ serialize(list.get(i), fields.get(i).getFieldObjectInspector(), level + 1);
}
}
return true;
}
}
-
- throw new RuntimeException("Unknown category type: "
- + objInspector.getCategory());
+
+ throw new RuntimeException("Unknown category type: " + objInspector.getCategory());
}
-
-
+
+
/**
* @return the useJSONSerialize
*/
@@ -503,5 +495,4 @@ public class HBaseSerDe implements SerDe
public void setUseJSONSerialize(boolean useJSONSerialize) {
this.useJSONSerialize = useJSONSerialize;
}
-
}
Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java Thu Jul 22 19:13:56 2010
@@ -23,53 +23,44 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.hbase.mapred.TableSplit;
/**
* HBaseSplit augments FileSplit with HBase column mapping.
*/
public class HBaseSplit extends FileSplit implements InputSplit {
- private String hbaseColumnMapping;
- private TableSplit split;
-
+ private final TableSplit split;
+
public HBaseSplit() {
super((Path) null, 0, 0, (String[]) null);
- hbaseColumnMapping = "";
split = new TableSplit();
}
-
- public HBaseSplit(TableSplit split, String columnsMapping, Path dummyPath) {
+
+ public HBaseSplit(TableSplit split, Path dummyPath) {
super(dummyPath, 0, 0, (String[]) null);
this.split = split;
- hbaseColumnMapping = columnsMapping;
}
-
+
public TableSplit getSplit() {
return this.split;
}
-
- public String getColumnsMapping() {
- return this.hbaseColumnMapping;
- }
-
+
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- hbaseColumnMapping = in.readUTF();
split.readFields(in);
}
@Override
public String toString() {
- return "TableSplit " + split + " : " + hbaseColumnMapping;
+ return "TableSplit " + split;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
- out.writeUTF(hbaseColumnMapping);
split.write(out);
}
Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Thu Jul 22 19:13:56 2010
@@ -26,20 +26,17 @@ import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Constants;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -57,7 +54,7 @@ public class HBaseStorageHandler
private HBaseConfiguration hbaseConf;
private HBaseAdmin admin;
-
+
private HBaseAdmin getHBaseAdmin() throws MetaException {
try {
if (admin == null) {
@@ -88,12 +85,12 @@ public class HBaseStorageHandler
public void preDropTable(Table table) throws MetaException {
// nothing to do
}
-
+
@Override
public void rollbackDropTable(Table table) throws MetaException {
// nothing to do
}
-
+
@Override
public void commitDropTable(
Table tbl, boolean deleteData) throws MetaException {
@@ -130,12 +127,12 @@ public class HBaseStorageHandler
// Check the hbase columns and get all the families
Map<String, String> serdeParam =
tbl.getSd().getSerdeInfo().getParameters();
- String hbaseColumnStr = serdeParam.get(HBaseSerDe.HBASE_COL_MAPPING);
- if (hbaseColumnStr == null) {
+ String hbaseColumnsMapping = serdeParam.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+ if (hbaseColumnsMapping == null) {
throw new MetaException("No hbase.columns.mapping defined in Serde.");
}
List<String> hbaseColumns =
- HBaseSerDe.parseColumnMapping(hbaseColumnStr);
+ HBaseSerDe.parseColumnMapping(hbaseColumnsMapping);
int iKeyFirst = hbaseColumns.indexOf(HBaseSerDe.HBASE_KEY_COL);
int iKeyLast = hbaseColumns.lastIndexOf(HBaseSerDe.HBASE_KEY_COL);
if (iKeyFirst != iKeyLast) {
@@ -152,10 +149,10 @@ public class HBaseStorageHandler
}
columnFamilies.add(hbaseColumn.substring(0, idx));
}
-
+
// Check if the given hbase table exists
HTableDescriptor tblDesc;
-
+
if (!getHBaseAdmin().tableExists(tblName)) {
// if it is not an external table then create one
if (!isExternal) {
@@ -164,14 +161,14 @@ public class HBaseStorageHandler
for (String cf : columnFamilies) {
tblDesc.addFamily(new HColumnDescriptor(cf + ":"));
}
-
+
getHBaseAdmin().createTable(tblDesc);
} else {
// an external table
- throw new MetaException("HBase table " + tblName +
+ throw new MetaException("HBase table " + tblName +
" doesn't exist while the table is declared as an external table.");
}
-
+
} else {
if (!isExternal) {
throw new MetaException("Table " + tblName + " already exists"
@@ -233,7 +230,7 @@ public class HBaseStorageHandler
public Class<? extends InputFormat> getInputFormatClass() {
return HiveHBaseTableInputFormat.class;
}
-
+
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
return HiveHBaseTableOutputFormat.class;
@@ -255,10 +252,10 @@ public class HBaseStorageHandler
Map<String, String> jobProperties) {
Properties tableProperties = tableDesc.getProperties();
-
+
jobProperties.put(
- HBaseSerDe.HBASE_COL_MAPPING,
- tableProperties.getProperty(HBaseSerDe.HBASE_COL_MAPPING));
+ HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING));
String tableName =
tableProperties.getProperty(HBaseSerDe.HBASE_TABLE_NAME);
Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Thu Jul 22 19:13:56 2010
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.hbase;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -28,57 +27,51 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* HiveHBaseTableInputFormat implements InputFormat for HBase storage handler
* tables, decorating an underlying HBase TableInputFormat with extra Hive logic
* such as column pruning.
*/
-public class HiveHBaseTableInputFormat<K extends ImmutableBytesWritable, V extends RowResult>
- implements InputFormat<K, V>, JobConfigurable {
-
- static final Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class);
-
- private HBaseExposedTableInputFormat hbaseInputFormat;
+public class HiveHBaseTableInputFormat extends TableInputFormatBase
+ implements InputFormat<ImmutableBytesWritable, Result> {
- public HiveHBaseTableInputFormat() {
- hbaseInputFormat = new HBaseExposedTableInputFormat();
- }
+ static final Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class);
@Override
- public RecordReader<K, V> getRecordReader(
- InputSplit split, JobConf job,
- Reporter reporter) throws IOException {
+ public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+ InputSplit split,
+ JobConf jobConf,
+ final Reporter reporter) throws IOException {
HBaseSplit hbaseSplit = (HBaseSplit) split;
+ String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
+ setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
+
+ String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+ List<String> columns = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping);
+ List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
- byte [] tableNameBytes;
- String hbaseTableName = job.get(HBaseSerDe.HBASE_TABLE_NAME);
- hbaseInputFormat.setHBaseTable(
- new HTable(
- new HBaseConfiguration(job),
- Bytes.toBytes(hbaseTableName)));
-
- String columnMapping = hbaseSplit.getColumnsMapping();
- List<String> columns = HBaseSerDe.parseColumnMapping(columnMapping);
- List<Integer> readColIDs =
- ColumnProjectionUtils.getReadColumnIDs(job);
-
if (columns.size() < readColIDs.size()) {
- throw new IOException(
- "Cannot read more columns than the given table contains.");
+ throw new IOException("Cannot read more columns than the given table contains.");
}
List<byte []> scanColumns = new ArrayList<byte []>();
@@ -103,28 +96,92 @@ public class HiveHBaseTableInputFormat<K
}
}
}
-
- hbaseInputFormat.setScanColumns(scanColumns.toArray(new byte[0][]));
-
- return (RecordReader<K, V>)
- hbaseInputFormat.getRecordReader(hbaseSplit.getSplit(), job, reporter);
+
+ setScan(new Scan().addColumns(scanColumns.toArray(new byte[0][])));
+ org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit = hbaseSplit.getSplit();
+
+ Job job = new Job(jobConf);
+ TaskAttemptContext tac =
+ new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()) {
+
+ @Override
+ public void progress() {
+ reporter.progress();
+ }
+ };
+
+ final org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable, Result>
+ recordReader = createRecordReader(tableSplit, tac);
+
+ return new RecordReader<ImmutableBytesWritable, Result>() {
+
+ @Override
+ public void close() throws IOException {
+ recordReader.close();
+ }
+
+ @Override
+ public ImmutableBytesWritable createKey() {
+ return new ImmutableBytesWritable();
+ }
+
+ @Override
+ public Result createValue() {
+ return new Result();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ float progress = 0.0F;
+
+ try {
+ progress = recordReader.getProgress();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ return progress;
+ }
+
+ @Override
+ public boolean next(ImmutableBytesWritable rowKey, Result value) throws IOException {
+
+ boolean next = false;
+
+ try {
+ next = recordReader.nextKeyValue();
+
+ if (next) {
+ rowKey.set(recordReader.getCurrentValue().getRow());
+ Writables.copyWritable(recordReader.getCurrentValue(), value);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ return next;
+ }
+ };
}
@Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- Path [] tableNames = FileInputFormat.getInputPaths(job);
- String hbaseTableName = job.get(HBaseSerDe.HBASE_TABLE_NAME);
- hbaseInputFormat.setHBaseTable(
- new HTable(new HBaseConfiguration(job), hbaseTableName));
-
- String hbaseSchemaMapping = job.get(HBaseSerDe.HBASE_COL_MAPPING);
- if (hbaseSchemaMapping == null) {
+ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
+
+ String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
+ setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
+ String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+ if (hbaseColumnsMapping == null) {
throw new IOException("hbase.columns.mapping required for HBase Table.");
}
// REVIEW: are we supposed to be applying the getReadColumnIDs
// same as in getRecordReader?
- List<String> columns = HBaseSerDe.parseColumnMapping(hbaseSchemaMapping);
+ List<String> columns = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping);
List<byte []> inputColumns = new ArrayList<byte []>();
for (String column : columns) {
if (HBaseSerDe.isSpecialColumn(column)) {
@@ -132,43 +189,18 @@ public class HiveHBaseTableInputFormat<K
}
inputColumns.add(Bytes.toBytes(column));
}
-
- hbaseInputFormat.setScanColumns(inputColumns.toArray(new byte[0][]));
-
- InputSplit[] splits = hbaseInputFormat.getSplits(
- job, numSplits <= 0 ? 1 : numSplits);
- InputSplit[] results = new InputSplit[splits.length];
- for (int i = 0; i < splits.length; i++) {
- results[i] = new HBaseSplit(
- (TableSplit) splits[i], hbaseSchemaMapping, tableNames[0]);
- }
- return results;
- }
-
- @Override
- public void configure(JobConf job) {
- hbaseInputFormat.configure(job);
- }
- /**
- * HBaseExposedTableInputFormat exposes some protected methods
- * from the HBase TableInputFormatBase.
- */
- static class HBaseExposedTableInputFormat
- extends org.apache.hadoop.hbase.mapred.TableInputFormatBase
- implements JobConfigurable {
-
- @Override
- public void configure(JobConf job) {
- // not needed for now
- }
-
- public void setScanColumns(byte[][] scanColumns) {
- setInputColumns(scanColumns);
- }
-
- public void setHBaseTable(HTable table) {
- setHTable(table);
+ setScan(new Scan().addColumns(inputColumns.toArray(new byte[0][])));
+ Job job = new Job(jobConf);
+ JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID());
+ Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
+ List<org.apache.hadoop.mapreduce.InputSplit> splits = getSplits(jobContext);
+ InputSplit [] results = new InputSplit[splits.size()];
+
+ for (int i = 0; i < splits.size(); i++) {
+ results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]);
}
+
+ return results;
}
}
Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java Thu Jul 22 19:13:56 2010
@@ -21,92 +21,108 @@ package org.apache.hadoop.hive.hbase;
import java.io.IOException;
import java.util.Properties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.BatchOperation;
-import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.util.Progressable;
/**
* HiveHBaseTableOutputFormat implements HiveOutputFormat for HBase tables.
+ * We also need to implement the @deprecated org.apache.hadoop.mapred.OutFormat<?,?>
+ * class to keep it compliant with Hive interfaces.
*/
-public class HiveHBaseTableOutputFormat extends
- TableOutputFormat implements
- HiveOutputFormat<ImmutableBytesWritable, BatchUpdate> {
-
- private final ImmutableBytesWritable key = new ImmutableBytesWritable();
+public class HiveHBaseTableOutputFormat extends
+ TableOutputFormat<ImmutableBytesWritable> implements
+ HiveOutputFormat<ImmutableBytesWritable, Put>,
+ OutputFormat<ImmutableBytesWritable, Put> {
+
+ static final Log LOG = LogFactory.getLog(HiveHBaseTableOutputFormat.class);
+ public static final String HBASE_WAL_ENABLED = "hive.hbase.wal.enabled";
/**
- * Update to the final out table, and output an empty key as the key.
- *
- * @param jc
- * the job configuration file
- * @param finalOutPath
- * the final output table name
- * @param valueClass
- * the value class used for create
- * @param isCompressed
- * whether the content is compressed or not
- * @param tableProperties
- * the tableInfo of this file's corresponding table
- * @param progress
- * progress used for status report
+ * Update the out table, and output an empty key as the key.
+ *
+ * @param jc the job configuration file
+ * @param finalOutPath the final output table name
+ * @param valueClass the value class
+ * @param isCompressed whether the content is compressed or not
+ * @param tableProperties the table info of the corresponding table
+ * @param progress progress used for status report
* @return the RecordWriter for the output file
*/
@Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
- Class<? extends Writable> valueClass, boolean isCompressed,
- Properties tableProperties, Progressable progress) throws IOException {
+ public RecordWriter getHiveRecordWriter(
+ JobConf jc,
+ Path finalOutPath,
+ Class<? extends Writable> valueClass,
+ boolean isCompressed,
+ Properties tableProperties,
+ final Progressable progressable) throws IOException {
+
String hbaseTableName = jc.get(HBaseSerDe.HBASE_TABLE_NAME);
jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
-
- boolean walEnabled = HiveConf.getBoolVar(
- jc, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED);
-
- HTable table = new HTable(new HBaseConfiguration(jc), hbaseTableName);
+ final boolean walEnabled = HiveConf.getBoolVar(
+ jc, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED);
+ final HTable table = new HTable(new HBaseConfiguration(jc), hbaseTableName);
table.setAutoFlush(false);
- return new HiveHBaseRecordWriter(table, walEnabled);
- }
- // This class was cloned from the HBase RecordWriter so that we
- // can control the WAL setting.
- private static class HiveHBaseRecordWriter implements RecordWriter {
- private HTable table;
- private boolean walEnabled;
-
- HiveHBaseRecordWriter(HTable table, boolean walEnabled) {
- this.table = table;
- this.walEnabled = walEnabled;
- }
+ return new RecordWriter() {
- @Override
- public void close(boolean abort) throws IOException {
- if (!abort) {
- table.flushCommits();
+ @Override
+ public void close(boolean abort) throws IOException {
+ if (!abort) {
+ table.flushCommits();
+ }
}
- }
- @Override
- public void write(Writable w) throws IOException {
- BatchUpdate batchUpdate = (BatchUpdate) w;
- for (BatchOperation bo : batchUpdate) {
- assert(bo.isPut());
- Put p = new Put(batchUpdate.getRow(), null);
- if (!walEnabled) {
- p.setWriteToWAL(false);
- }
- p.add(bo.getColumn(), batchUpdate.getTimestamp(), bo.getValue());
- table.put(p);
+ @Override
+ public void write(Writable w) throws IOException {
+ Put put = (Put) w;
+ put.setWriteToWAL(walEnabled);
+ table.put(put);
}
+ };
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException {
+
+ String hbaseTableName = jc.get(HBaseSerDe.HBASE_TABLE_NAME);
+ jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
+ Job job = new Job(jc);
+ JobContext jobContext =
+ new JobContext(job.getConfiguration(), job.getJobID());
+
+ try {
+ checkOutputSpecs(jobContext);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
}
}
+
+ @Override
+ public
+ org.apache.hadoop.mapred.RecordWriter<ImmutableBytesWritable, Put>
+ getRecordWriter(
+ FileSystem fileSystem,
+ JobConf jobConf,
+ String name,
+ Progressable progressable) throws IOException {
+
+ throw new RuntimeException("Error: Hive should not invoke this method.");
+ }
}
Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java Thu Jul 22 19:13:56 2010
@@ -18,24 +18,21 @@
package org.apache.hadoop.hive.hbase;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Map;
+import java.util.Properties;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.io.Text;
@@ -45,9 +42,6 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Progressable;
/**
@@ -61,13 +55,13 @@ public class HiveHFileOutputFormat exten
private static final String HFILE_FAMILY_PATH = "hfile.family.path";
- private static final Log LOG = LogFactory.getLog(
+ static final Log LOG = LogFactory.getLog(
HiveHFileOutputFormat.class.getName());
- private org.apache.hadoop.mapreduce.RecordWriter<
- ImmutableBytesWritable, KeyValue> getFileWriter(
- org.apache.hadoop.mapreduce.TaskAttemptContext tac) throws IOException
- {
+ private
+ org.apache.hadoop.mapreduce.RecordWriter<ImmutableBytesWritable, KeyValue>
+ getFileWriter(org.apache.hadoop.mapreduce.TaskAttemptContext tac)
+ throws IOException {
try {
return super.getRecordWriter(tac);
} catch (InterruptedException ex) {
@@ -77,8 +71,10 @@ public class HiveHFileOutputFormat exten
@Override
public RecordWriter getHiveRecordWriter(
- final JobConf jc, final Path finalOutPath,
- Class<? extends Writable> valueClass, boolean isCompressed,
+ final JobConf jc,
+ final Path finalOutPath,
+ Class<? extends Writable> valueClass,
+ boolean isCompressed,
Properties tableProperties,
final Progressable progressable) throws IOException {
@@ -209,5 +205,4 @@ public class HiveHFileOutputFormat exten
}
};
}
-
}
Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java Thu Jul 22 19:13:56 2010
@@ -18,29 +18,30 @@
package org.apache.hadoop.hive.hbase;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Map.Entry;
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyMap;
import org.apache.hadoop.hive.serde2.lazy.LazyObject;
import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive;
-import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.io.Writable;
/**
* LazyHBaseCellMap refines LazyMap with HBase column mapping.
*/
public class LazyHBaseCellMap extends LazyMap {
-
- private RowResult rowResult;
- private String hbaseColumnFamily;
-
+
+ private Result result;
+ private byte [] columnFamilyBytes;
+
/**
* Construct a LazyCellMap object with the ObjectInspector.
* @param oi
@@ -49,76 +50,68 @@ public class LazyHBaseCellMap extends La
super(oi);
}
- @Override
- public void init(ByteArrayRef bytes, int start, int length) {
- // do nothing
- }
-
- public void init(RowResult rr, String columnFamily) {
- rowResult = rr;
- hbaseColumnFamily = columnFamily;
+ public void init(Result r, byte [] columnFamilyBytes) {
+ result = r;
+ this.columnFamilyBytes = columnFamilyBytes;
setParsed(false);
}
-
+
private void parse() {
if (cachedMap == null) {
cachedMap = new LinkedHashMap<Object, Object>();
} else {
cachedMap.clear();
}
-
- Iterator<byte[]> iter = rowResult.keySet().iterator();
-
- byte[] columnFamily = hbaseColumnFamily.getBytes();
- while (iter.hasNext()) {
- byte [] columnKey = iter.next();
- if (columnFamily.length > columnKey.length) {
- continue;
- }
-
- if (0 == LazyUtils.compare(
- columnFamily, 0, columnFamily.length,
- columnKey, 0, columnFamily.length)) {
-
- byte [] columnValue = rowResult.get(columnKey).getValue();
- if (columnValue == null || columnValue.length == 0) {
- // an empty object
+
+ NavigableMap<byte [], byte []> familyMap = result.getFamilyMap(columnFamilyBytes);
+
+ if (familyMap != null) {
+
+ for (Entry<byte [], byte []> e : familyMap.entrySet()) {
+ // null values and values of zero length are not added to the cachedMap
+ if (e.getValue() == null || e.getValue().length == 0) {
continue;
}
-
+
// Keys are always primitive
- LazyPrimitive<?, ?> key = LazyFactory.createLazyPrimitiveClass(
- (PrimitiveObjectInspector)
- ((MapObjectInspector) getInspector()).getMapKeyObjectInspector());
+ LazyPrimitive<? extends ObjectInspector, ? extends Writable> key =
+ LazyFactory.createLazyPrimitiveClass(
+ (PrimitiveObjectInspector) getInspector().getMapKeyObjectInspector());
+
ByteArrayRef keyRef = new ByteArrayRef();
- keyRef.setData(columnKey);
- key.init(
- keyRef, columnFamily.length, columnKey.length - columnFamily.length);
-
+ keyRef.setData(e.getKey());
+ key.init(keyRef, 0, keyRef.getData().length);
+
// Value
- LazyObject value = LazyFactory.createLazyObject(
- ((MapObjectInspector) getInspector()).getMapValueObjectInspector());
+ LazyObject<?> value =
+ LazyFactory.createLazyObject(
+ getInspector().getMapValueObjectInspector());
+
ByteArrayRef valueRef = new ByteArrayRef();
- valueRef.setData(columnValue);
- value.init(valueRef, 0, columnValue.length);
-
- // Put it into the map
+ valueRef.setData(e.getValue());
+ value.init(valueRef, 0, valueRef.getData().length);
+
+ // Put the key/value into the map
cachedMap.put(key.getObject(), value.getObject());
}
}
+
+ setParsed(true);
}
-
+
+
/**
* Get the value in the map for the given key.
- *
+ *
* @param key
* @return
*/
+ @Override
public Object getMapValueElement(Object key) {
if (!getParsed()) {
parse();
}
-
+
for (Map.Entry<Object, Object> entry : cachedMap.entrySet()) {
LazyPrimitive<?, ?> lazyKeyI = (LazyPrimitive<?, ?>) entry.getKey();
// getWritableObject() will convert LazyPrimitive to actual primitive
@@ -129,26 +122,27 @@ public class LazyHBaseCellMap extends La
}
if (keyI.equals(key)) {
// Got a match, return the value
- LazyObject v = (LazyObject) entry.getValue();
+ LazyObject<?> v = (LazyObject<?>) entry.getValue();
return v == null ? v : v.getObject();
}
}
-
+
return null;
}
-
+
+ @Override
public Map<Object, Object> getMap() {
if (!getParsed()) {
parse();
}
return cachedMap;
}
-
+
+ @Override
public int getMapSize() {
if (!getParsed()) {
parse();
}
return cachedMap.size();
}
-
}
Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java Thu Jul 22 19:13:56 2010
@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hive.hbase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObject;
@@ -36,33 +37,35 @@ import org.apache.hadoop.hive.serde2.obj
* primitive or non-primitive.
*/
public class LazyHBaseRow extends LazyStruct {
-
+
/**
* The HBase columns mapping of the row.
*/
+ private Result result;
private List<String> hbaseColumns;
- private RowResult rowResult;
+ private List<byte []> hbaseColumnsBytes;
private ArrayList<Object> cachedList;
-
+
/**
* Construct a LazyHBaseRow object with the ObjectInspector.
*/
public LazyHBaseRow(LazySimpleStructObjectInspector oi) {
super(oi);
}
-
+
/**
- * Set the hbase row data(a RowResult writable) for this LazyStruct.
- * @see LazyHBaseRow#init(RowResult)
+ * Set the HBase row data(a Result writable) for this LazyStruct.
+ * @see LazyHBaseRow#init(Result)
*/
- public void init(RowResult rr, List<String> hbaseColumns) {
- this.rowResult = rr;
+ public void init(Result r, List<String> hbaseColumns, List<byte []> hbaseColumnsBytes) {
+ result = r;
this.hbaseColumns = hbaseColumns;
+ this.hbaseColumnsBytes = hbaseColumnsBytes;
setParsed(false);
}
/**
- * Parse the RowResult and fill each field.
+ * Parse the Result and fill each field.
* @see LazyStruct#parse()
*/
private void parse() {
@@ -74,13 +77,13 @@ public class LazyHBaseRow extends LazySt
String hbaseColumn = hbaseColumns.get(i);
if (hbaseColumn.endsWith(":")) {
// a column family
- getFields()[i] =
+ getFields()[i] =
new LazyHBaseCellMap(
(LazyMapObjectInspector)
fieldRefs.get(i).getFieldObjectInspector());
continue;
}
-
+
getFields()[i] = LazyFactory.createLazyObject(
fieldRefs.get(i).getFieldObjectInspector());
}
@@ -89,26 +92,27 @@ public class LazyHBaseRow extends LazySt
Arrays.fill(getFieldInited(), false);
setParsed(true);
}
-
+
/**
- * Get one field out of the hbase row.
- *
+ * Get one field out of the HBase row.
+ *
* If the field is a primitive field, return the actual object.
* Otherwise return the LazyObject. This is because PrimitiveObjectInspector
* does not have control over the object used by the user - the user simply
- * directly uses the Object instead of going through
- * Object PrimitiveObjectInspector.get(Object).
- *
+ * directly uses the Object instead of going through
+ * Object PrimitiveObjectInspector.get(Object).
+ *
* @param fieldID The field ID
* @return The field as a LazyObject
*/
+ @Override
public Object getField(int fieldID) {
if (!getParsed()) {
parse();
}
return uncheckedGetField(fieldID);
}
-
+
/**
* Get the field out of the row without checking whether parsing is needed.
* This is called by both getField and getFieldsAsList.
@@ -119,32 +123,34 @@ public class LazyHBaseRow extends LazySt
private Object uncheckedGetField(int fieldID) {
if (!getFieldInited()[fieldID]) {
getFieldInited()[fieldID] = true;
-
ByteArrayRef ref = null;
-
String columnName = hbaseColumns.get(fieldID);
+ byte [] columnNameBytes = hbaseColumnsBytes.get(fieldID);
+
if (columnName.equals(HBaseSerDe.HBASE_KEY_COL)) {
ref = new ByteArrayRef();
- ref.setData(rowResult.getRow());
+ ref.setData(result.getRow());
} else {
if (columnName.endsWith(":")) {
// it is a column family
- ((LazyHBaseCellMap) getFields()[fieldID]).init(
- rowResult, columnName);
+ ((LazyHBaseCellMap) getFields()[fieldID]).init(result, columnNameBytes);
} else {
// it is a column
- if (rowResult.containsKey(columnName)) {
- ref = new ByteArrayRef();
- ref.setData(rowResult.get(columnName).getValue());
- } else {
+ byte [] res = result.getValue(columnNameBytes);
+ if (res == null) {
return null;
+ } else {
+ ref = new ByteArrayRef();
+ ref.setData(res);
}
}
}
+
if (ref != null) {
getFields()[fieldID].init(ref, 0, ref.getData().length);
}
}
+
return getFields()[fieldID].getObject();
}
@@ -152,6 +158,7 @@ public class LazyHBaseRow extends LazySt
* Get the values of the fields as an ArrayList.
* @return The values of the fields as an ArrayList.
*/
+ @Override
public ArrayList<Object> getFieldsAsList() {
if (!getParsed()) {
parse();
@@ -166,10 +173,9 @@ public class LazyHBaseRow extends LazySt
}
return cachedList;
}
-
+
@Override
public Object getObject() {
return this;
}
-
}
Modified: hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestSetup.java Thu Jul 22 19:13:56 2010
@@ -37,8 +37,8 @@ import org.apache.zookeeper.Watcher;
* HBaseTestSetup defines HBase-specific test fixtures which are
* reused across testcases.
*/
-public class HBaseTestSetup extends TestSetup
-{
+public class HBaseTestSetup extends TestSetup {
+
private MiniHBaseCluster hbaseCluster;
private MiniZooKeeperCluster zooKeeperCluster;
private int zooKeeperPort;
@@ -49,7 +49,7 @@ public class HBaseTestSetup extends Test
public HBaseTestSetup(Test test) {
super(test);
}
-
+
void preTest(HiveConf conf) throws Exception {
if (hbaseCluster == null) {
// We set up fixtures on demand for the first testcase, and leave
Modified: hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Thu Jul 22 19:13:56 2010
@@ -15,17 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hive.hbase;
+package org.apache.hadoop.hive.hbase;
+import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import junit.framework.TestCase;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
@@ -38,8 +41,6 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import junit.framework.TestCase;
-
/**
* Tests the HBaseSerDe class.
*/
@@ -54,32 +55,39 @@ public class TestHBaseSerDe extends Test
Configuration conf = new Configuration();
Properties tbl = createProperties();
serDe.initialize(conf, tbl);
-
+
byte[] colabyte = "cola:abyte".getBytes();
byte[] colbshort = "colb:ashort".getBytes();
byte[] colcint = "colc:aint".getBytes();
byte[] colalong = "cola:along".getBytes();
byte[] colbdouble = "colb:adouble".getBytes();
byte[] colcstring = "colc:astring".getBytes();
-
+
// Data
- HbaseMapWritable<byte[], Cell> cells =
- new HbaseMapWritable<byte[], Cell>();
- cells.put(colabyte, new Cell("123".getBytes(), 0));
- cells.put(colbshort, new Cell("456".getBytes(), 0));
- cells.put(colcint, new Cell("789".getBytes(), 0));
- cells.put(colalong, new Cell("1000".getBytes(), 0));
- cells.put(colbdouble, new Cell("5.3".getBytes(), 0));
- cells.put(colcstring, new Cell("hive and hadoop".getBytes(), 0));
- RowResult rr = new RowResult("test-row1".getBytes(), cells);
- BatchUpdate bu = new BatchUpdate("test-row1".getBytes());
- bu.put(colabyte, "123".getBytes());
- bu.put(colbshort, "456".getBytes());
- bu.put(colcint, "789".getBytes());
- bu.put(colalong, "1000".getBytes());
- bu.put(colbdouble, "5.3".getBytes());
- bu.put(colcstring, "hive and hadoop".getBytes());
-
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
+ colabyte, 0, Bytes.toBytes("123")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
+ colbshort, 0, Bytes.toBytes("456")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
+ colcint, 0, Bytes.toBytes("789")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
+ colalong, 0, Bytes.toBytes("1000")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
+ colbdouble, 0, Bytes.toBytes("5.3")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
+ colcstring, 0, Bytes.toBytes("hive and hadoop")));
+ Result r = new Result(kvs);
+
+ Put p = new Put(Bytes.toBytes("test-row1"));
+
+ p.add(colabyte, 0, Bytes.toBytes("123"));
+ p.add(colbshort, 0, Bytes.toBytes("456"));
+ p.add(colcint, 0, Bytes.toBytes("789"));
+ p.add(colalong, 0, Bytes.toBytes("1000"));
+ p.add(colbdouble, 0, Bytes.toBytes("5.3"));
+ p.add(colcstring, 0, Bytes.toBytes("hive and hadoop"));
+
Object[] expectedFieldsData = {
new Text("test-row1"),
new ByteWritable((byte)123),
@@ -89,12 +97,12 @@ public class TestHBaseSerDe extends Test
new DoubleWritable(5.3),
new Text("hive and hadoop")
};
-
- deserializeAndSerialize(serDe, rr, bu, expectedFieldsData);
+
+ deserializeAndSerialize(serDe, r, p, expectedFieldsData);
}
private void deserializeAndSerialize(
- HBaseSerDe serDe, RowResult rr, BatchUpdate bu,
+ HBaseSerDe serDe, Result r, Put p,
Object[] expectedFieldsData) throws SerDeException {
// Get the row structure
@@ -102,34 +110,33 @@ public class TestHBaseSerDe extends Test
serDe.getObjectInspector();
List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
assertEquals(7, fieldRefs.size());
-
+
// Deserialize
- Object row = serDe.deserialize(rr);
+ Object row = serDe.deserialize(r);
for (int i = 0; i < fieldRefs.size(); i++) {
Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
if (fieldData != null) {
- fieldData = ((LazyPrimitive)fieldData).getWritableObject();
+ fieldData = ((LazyPrimitive<?, ?>)fieldData).getWritableObject();
}
assertEquals("Field " + i, expectedFieldsData[i], fieldData);
}
- // Serialize
- assertEquals(BatchUpdate.class, serDe.getSerializedClass());
- BatchUpdate serializedBU = (BatchUpdate)serDe.serialize(row, oi);
- assertEquals("Serialized data", bu.toString(), serializedBU.toString());
+ // Serialize
+ assertEquals(Put.class, serDe.getSerializedClass());
+ Put serializedPut = (Put) serDe.serialize(row, oi);
+ assertEquals("Serialized data", p.toString(), serializedPut.toString());
}
private Properties createProperties() {
Properties tbl = new Properties();
-
+
// Set the configuration parameters
tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
tbl.setProperty("columns",
"key,abyte,ashort,aint,along,adouble,astring");
tbl.setProperty("columns.types",
"string,tinyint:smallint:int:bigint:double:string");
- tbl.setProperty(HBaseSerDe.HBASE_COL_MAPPING,
+ tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
"cola:abyte,colb:ashort,colc:aint,cola:along,colb:adouble,colc:astring");
return tbl;
}
-
}
Modified: hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java?rev=966811&r1=966810&r2=966811&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java Thu Jul 22 19:13:56 2010
@@ -15,14 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hive.hbase;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
-import org.apache.hadoop.hbase.io.RowResult;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
@@ -34,8 +38,6 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
-import junit.framework.TestCase;
-
/**
* TestLazyHBaseObject is a test for the LazyHBaseXXX classes.
*/
@@ -47,44 +49,56 @@ public class TestLazyHBaseObject extends
// Map of Integer to String
Text nullSequence = new Text("\\N");
ObjectInspector oi = LazyFactory.createLazyObjectInspector(
- TypeInfoUtils.getTypeInfosFromTypeString("map<int,string>").get(0),
+ TypeInfoUtils.getTypeInfosFromTypeString("map<int,string>").get(0),
new byte[]{(byte)1, (byte)2}, 0, nullSequence, false, (byte)0);
-
+
LazyHBaseCellMap b = new LazyHBaseCellMap((LazyMapObjectInspector) oi);
-
- // Intialize a row result
- HbaseMapWritable<byte[], Cell> cells = new HbaseMapWritable<byte[], Cell>();
- cells.put("cfa:col1".getBytes(), new Cell("cfacol1".getBytes(), 0));
- cells.put("cfa:col2".getBytes(), new Cell("cfacol2".getBytes(), 0));
- cells.put("cfb:2".getBytes(), new Cell("def".getBytes(), 0));
- cells.put("cfb:-1".getBytes(), new Cell("".getBytes(), 0));
- cells.put("cfb:0".getBytes(), new Cell("0".getBytes(), 0));
- cells.put("cfb:8".getBytes(), new Cell("abc".getBytes(), 0));
- cells.put("cfc:col3".getBytes(), new Cell("cfccol3".getBytes(), 0));
-
- RowResult rr = new RowResult("test-row".getBytes(), cells);
-
- b.init(rr, "cfb:");
-
+
+ // Initialize a result
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa:col1"), 0, Bytes.toBytes("cfacol1")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa:col2"), 0, Bytes.toBytes("cfacol2")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb:2"), 0, Bytes.toBytes("def")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb:-1"), 0, Bytes.toBytes("")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb:0"), 0, Bytes.toBytes("0")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb:8"), 0, Bytes.toBytes("abc")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfc:col3"), 0, Bytes.toBytes("cfccol3")));
+
+ Result r = new Result(kvs);
+
+ b.init(r, "cfb".getBytes());
+
assertEquals(
new Text("def"),
((LazyString)b.getMapValueElement(
new IntWritable(2))).getWritableObject());
+
assertNull(b.getMapValueElement(new IntWritable(-1)));
+
assertEquals(
new Text("0"),
((LazyString)b.getMapValueElement(
new IntWritable(0))).getWritableObject());
+
assertEquals(
new Text("abc"),
((LazyString)b.getMapValueElement(
new IntWritable(8))).getWritableObject());
+
assertNull(b.getMapValueElement(new IntWritable(12345)));
-
+
assertEquals("{0:'0',2:'def',8:'abc'}".replace('\'', '\"'),
SerDeUtils.getJSONString(b, oi));
}
-
+
/**
* Test the LazyMap class with String-to-String.
*/
@@ -92,139 +106,171 @@ public class TestLazyHBaseObject extends
// Map of String to String
Text nullSequence = new Text("\\N");
ObjectInspector oi = LazyFactory.createLazyObjectInspector(
- TypeInfoUtils.getTypeInfosFromTypeString("map<string,string>").get(0),
+ TypeInfoUtils.getTypeInfosFromTypeString("map<string,string>").get(0),
new byte[]{(byte)'#', (byte)'\t'}, 0, nullSequence, false, (byte)0);
-
+
LazyHBaseCellMap b = new LazyHBaseCellMap((LazyMapObjectInspector) oi);
-
- // Intialize a row result
- HbaseMapWritable<byte[], Cell> cells =
- new HbaseMapWritable<byte[], Cell>();
- cells.put("cfa:col1".getBytes(), new Cell("cfacol1".getBytes(), 0));
- cells.put("cfa:col2".getBytes(), new Cell("cfacol2".getBytes(), 0));
- cells.put("cfb:2".getBytes(), new Cell("d\tf".getBytes(), 0));
- cells.put("cfb:-1".getBytes(), new Cell("".getBytes(), 0));
- cells.put("cfb:0".getBytes(), new Cell("0".getBytes(), 0));
- cells.put("cfb:8".getBytes(), new Cell("abc".getBytes(), 0));
- cells.put("cfc:col3".getBytes(), new Cell("cfccol3".getBytes(), 0));
-
- RowResult rr = new RowResult("test-row".getBytes(), cells);
-
- b.init(rr, "cfb:");
-
+
+ // Initialize a result
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("col1"), 0, Bytes.toBytes("cfacol1")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("col2"), 0, Bytes.toBytes("cfacol2")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("2"), 0, Bytes.toBytes("d\tf")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("-1"), 0, Bytes.toBytes("")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("0"), 0, Bytes.toBytes("0")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("8"), 0, Bytes.toBytes("abc")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfc"), Bytes.toBytes("col3"), 0, Bytes.toBytes("cfccol3")));
+
+ Result r = new Result(kvs);
+ b.init(r, "cfb".getBytes());
+
assertEquals(
new Text("d\tf"),
((LazyString)b.getMapValueElement(
new Text("2"))).getWritableObject());
+
assertNull(b.getMapValueElement(new Text("-1")));
+
assertEquals(
new Text("0"),
((LazyString)b.getMapValueElement(
new Text("0"))).getWritableObject());
+
assertEquals(
new Text("abc"),
((LazyString)b.getMapValueElement(
new Text("8"))).getWritableObject());
+
assertNull(b.getMapValueElement(new Text("-")));
-
+
assertEquals(
"{'0':'0','2':'d\\tf','8':'abc'}".replace('\'', '\"'),
SerDeUtils.getJSONString(b, oi));
}
-
+
/**
* Test the LazyHBaseRow class with one-for-one mappings between
* Hive fields and HBase columns.
*/
public void testLazyHBaseRow1() {
- List<TypeInfo> fieldTypeInfos =
+ List<TypeInfo> fieldTypeInfos =
TypeInfoUtils.getTypeInfosFromTypeString(
"string,int,array<string>,map<string,string>,string");
List<String> fieldNames = Arrays.asList(
new String[]{"key", "a", "b", "c", "d"});
Text nullSequence = new Text("\\N");
-
- List<String> hbaseColumnNames =
+
+ List<String> hbaseColumnNames =
Arrays.asList(new String[]{":key", "cfa:a", "cfa:b", "cfb:c", "cfb:d"});
-
+ List<byte []> hbaseColumnNamesBytes = HBaseSerDe.initColumnNamesBytes(hbaseColumnNames);
+
ObjectInspector oi = LazyFactory.createLazyStructInspector(fieldNames,
fieldTypeInfos, new byte[] {' ', ':', '='},
nullSequence, false, false, (byte)0);
LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi);
- HbaseMapWritable<byte[], Cell> cells =
- new HbaseMapWritable<byte[], Cell>();
-
- cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0));
- cells.put("cfa:b".getBytes(), new Cell("a:b:c".getBytes(), 0));
- cells.put("cfb:c".getBytes(), new Cell("d=e:f=g".getBytes(), 0));
- cells.put("cfb:d".getBytes(), new Cell("hi".getBytes(), 0));
- RowResult rr = new RowResult("test-row".getBytes(), cells);
- o.init(rr, hbaseColumnNames);
- assertEquals(
- ("{'key':'test-row','a':123,'b':['a','b','c'],"
- + "'c':{'d':'e','f':'g'},'d':'hi'}").replace("'", "\""),
- SerDeUtils.getJSONString(o, oi));
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
- cells.clear();
- cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0));
- cells.put("cfb:c".getBytes(), new Cell("d=e:f=g".getBytes(), 0));
- rr = new RowResult("test-row".getBytes(), cells);
- o.init(rr, hbaseColumnNames);
- assertEquals(
- ("{'key':'test-row','a':123,'b':null,"
- + "'c':{'d':'e','f':'g'},'d':null}").replace("'", "\""),
- SerDeUtils.getJSONString(o, oi));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("a:b:c")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("c"), 0, Bytes.toBytes("d=e:f=g")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("hi")));
+
+ Result r = new Result(kvs);
- cells.clear();
- cells.put("cfa:b".getBytes(), new Cell("a".getBytes(), 0));
- cells.put("cfb:c".getBytes(), new Cell("d=\\N:f=g:h".getBytes(), 0));
- cells.put("cfb:d".getBytes(), new Cell("no".getBytes(), 0));
- rr = new RowResult("test-row".getBytes(), cells);
- o.init(rr, hbaseColumnNames);
+ o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
assertEquals(
- ("{'key':'test-row','a':null,'b':['a'],"
- + "'c':{'d':null,'f':'g','h':null},'d':'no'}").replace("'", "\""),
+ ("{'key':'test-row','a':123,'b':['a','b','c'],"
+ + "'c':{'d':'e','f':'g'},'d':'hi'}").replace("'", "\""),
SerDeUtils.getJSONString(o, oi));
- cells.clear();
- cells.put("cfa:b".getBytes(), new Cell(":a::".getBytes(), 0));
- cells.put("cfb:d".getBytes(), new Cell("no".getBytes(), 0));
- rr = new RowResult("test-row".getBytes(), cells);
- o.init(rr, hbaseColumnNames);
+ kvs.clear();
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("c"), 0, Bytes.toBytes("d=e:f=g")));
+ r = new Result(kvs);
+
+ o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
+ assertEquals(
+ ("{'key':'test-row','a':123,'b':null,"
+ + "'c':{'d':'e','f':'g'},'d':null}").replace("'", "\""),
+ SerDeUtils.getJSONString(o, oi));
+
+ kvs.clear();
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("a")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("c"), 0, Bytes.toBytes("d=\\N:f=g:h")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("no")));
+ r = new Result(kvs);
+
+ o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
+ assertEquals(
+ ("{'key':'test-row','a':null,'b':['a'],"
+ + "'c':{'d':null,'f':'g','h':null},'d':'no'}").replace("'", "\""),
+ SerDeUtils.getJSONString(o, oi));
+
+ kvs.clear();
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes(":a::")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("no")));
+ r = new Result(kvs);
+
+ o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
assertEquals(
("{'key':'test-row','a':null,'b':['','a','',''],"
+ "'c':null,'d':'no'}").replace("'", "\""),
SerDeUtils.getJSONString(o, oi));
- cells.clear();
- cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0));
- cells.put("cfa:b".getBytes(), new Cell("".getBytes(), 0));
- cells.put("cfb:c".getBytes(), new Cell("".getBytes(), 0));
- cells.put("cfb:d".getBytes(), new Cell("".getBytes(), 0));
- rr = new RowResult("test-row".getBytes(), cells);
- o.init(rr, hbaseColumnNames);
+ kvs.clear();
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("c"), 0, Bytes.toBytes("")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("")));
+ r = new Result(kvs);
+
+ o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
assertEquals(
"{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""),
SerDeUtils.getJSONString(o, oi));
}
-
+
/**
* Test the LazyHBaseRow class with a mapping from a Hive field to
* an HBase column family.
*/
public void testLazyHBaseRow2() {
// column family is mapped to Map<string,string>
- List<TypeInfo> fieldTypeInfos =
+ List<TypeInfo> fieldTypeInfos =
TypeInfoUtils.getTypeInfosFromTypeString(
"string,int,array<string>,map<string,string>,string");
List<String> fieldNames = Arrays.asList(
new String[]{"key", "a", "b", "c", "d"});
Text nullSequence = new Text("\\N");
-
- List<String> hbaseColumnNames =
+
+ List<String> hbaseColumnNames =
Arrays.asList(new String[]{":key", "cfa:a", "cfa:b", "cfb:", "cfc:d"});
-
+ List<byte []> hbaseColumnNamesBytes = HBaseSerDe.initColumnNamesBytes(hbaseColumnNames);
+
ObjectInspector oi = LazyFactory.createLazyStructInspector(
fieldNames,
fieldTypeInfos,
@@ -232,59 +278,79 @@ public class TestLazyHBaseObject extends
nullSequence, false, false, (byte) 0);
LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi);
- HbaseMapWritable<byte[], Cell> cells =
- new HbaseMapWritable<byte[], Cell>();
-
- cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0));
- cells.put("cfa:b".getBytes(), new Cell("a:b:c".getBytes(), 0));
- cells.put("cfb:d".getBytes(), new Cell("e".getBytes(), 0));
- cells.put("cfb:f".getBytes(), new Cell("g".getBytes(), 0));
- cells.put("cfc:d".getBytes(), new Cell("hi".getBytes(), 0));
- RowResult rr = new RowResult("test-row".getBytes(), cells);
- o.init(rr, hbaseColumnNames);
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("a:b:c")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("e")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("f"), 0, Bytes.toBytes("g")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfc"), Bytes.toBytes("d"), 0, Bytes.toBytes("hi")));
+
+ Result r = new Result(kvs);
+
+ o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
assertEquals(
("{'key':'test-row','a':123,'b':['a','b','c'],"
- + "'c':{'d':'e','f':'g'},'d':'hi'}").replace("'", "\""),
+ + "'c':{'d':'e','f':'g'},'d':'hi'}").replace("'", "\""),
SerDeUtils.getJSONString(o, oi));
- cells.clear();
- cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0));
- cells.put("cfb:d".getBytes(), new Cell("e".getBytes(), 0));
- cells.put("cfb:f".getBytes(), new Cell("g".getBytes(), 0));
- rr = new RowResult("test-row".getBytes(), cells);
- o.init(rr, hbaseColumnNames);
+ kvs.clear();
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("e")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("f"), 0, Bytes.toBytes("g")));
+ r = new Result(kvs);
+
+ o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
assertEquals(
("{'key':'test-row','a':123,'b':null,"
+ "'c':{'d':'e','f':'g'},'d':null}").replace("'", "\""),
SerDeUtils.getJSONString(o, oi));
- cells.clear();
- cells.put("cfa:b".getBytes(), new Cell("a".getBytes(), 0));
- cells.put("cfb:f".getBytes(), new Cell("g".getBytes(), 0));
- cells.put("cfc:d".getBytes(), new Cell("no".getBytes(), 0));
- rr = new RowResult("test-row".getBytes(), cells);
- o.init(rr, hbaseColumnNames);
+ kvs.clear();
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("a")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfb"), Bytes.toBytes("f"), 0, Bytes.toBytes("g")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfc"), Bytes.toBytes("d"), 0, Bytes.toBytes("no")));
+ r = new Result(kvs);
+
+ o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
assertEquals(
("{'key':'test-row','a':null,'b':['a'],"
+ "'c':{'f':'g'},'d':'no'}").replace("'", "\""),
SerDeUtils.getJSONString(o, oi));
- cells.clear();
- cells.put("cfa:b".getBytes(), new Cell(":a::".getBytes(), 0));
- cells.put("cfc:d".getBytes(), new Cell("no".getBytes(), 0));
- rr = new RowResult("test-row".getBytes(), cells);
- o.init(rr, hbaseColumnNames);
+ kvs.clear();
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes(":a::")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfc"), Bytes.toBytes("d"), 0, Bytes.toBytes("no")));
+ r = new Result(kvs);
+
+ o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
assertEquals(
("{'key':'test-row','a':null,'b':['','a','',''],"
+ "'c':{},'d':'no'}").replace("'", "\""),
SerDeUtils.getJSONString(o, oi));
- cells.clear();
- cells.put("cfa:a".getBytes(), new Cell("123".getBytes(), 0));
- cells.put("cfa:b".getBytes(), new Cell("".getBytes(), 0));
- cells.put("cfc:d".getBytes(), new Cell("".getBytes(), 0));
- rr = new RowResult("test-row".getBytes(), cells);
- o.init(rr, hbaseColumnNames);
+ kvs.clear();
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("")));
+ kvs.add(new KeyValue(Bytes.toBytes("test-row"),
+ Bytes.toBytes("cfc"), Bytes.toBytes("d"), 0, Bytes.toBytes("")));
+ r = new Result(kvs);
+
+ o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
assertEquals(
"{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""),
SerDeUtils.getJSONString(o, oi));