You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2011/10/07 17:15:28 UTC
svn commit: r1180106 - in /incubator/hcatalog/trunk: ./ storage-drivers/
storage-drivers/hbase/src/ storage-drivers/hbase/src/java/
storage-drivers/hbase/src/java/org/
storage-drivers/hbase/src/java/org/apache/
storage-drivers/hbase/src/java/org/apache...
Author: khorgath
Date: Fri Oct 7 17:15:28 2011
New Revision: 1180106
URL: http://svn.apache.org/viewvc?rev=1180106&view=rev
Log:
HCATALOG-74 ResultConverter for HBase Storage Drivers
Added:
incubator/hcatalog/trunk/storage-drivers/hbase/src/
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/all-tests
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/excluded-tests
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/log4j.xml
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/storage-drivers/build.xml
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1180106&r1=1180105&r2=1180106&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Fri Oct 7 17:15:28 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
NEW FEATURES
+ HCAT-74. ResultConverter for HBase Storage Drivers (avandana via khorgath)
+
HCAT-89. Support for creating non-native tables (avandana via gates)
HCAT-60. Refactor HCatalog to support non-filebased outputformats (toffer via gates)
Modified: incubator/hcatalog/trunk/storage-drivers/build.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/build.xml?rev=1180106&r1=1180105&r2=1180106&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/build.xml (original)
+++ incubator/hcatalog/trunk/storage-drivers/build.xml Fri Oct 7 17:15:28 2011
@@ -25,7 +25,7 @@
</condition>
<!-- add storage drivers here -->
<echo>Executing storage-driver "${target}" for hbase</echo>
- <!-- ant target="${target}" dir="hbase" inheritAll="false" useNativeBasedir="true"/ -->
+ <ant target="${target}" dir="hbase" inheritAll="false" useNativeBasedir="true"/>
</target>
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java Fri Oct 7 17:15:28 2011
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hcatalog.common.HCatConstants;
+
+/**
+ * Constants class for constants used in the HBase Storage Drivers module
+ */
+class HBaseConstants {
+
+ /** key used to define th version number HBaseOutputStorage driver to use when writing out data for a job */
+ public static final String PROPERTY_OUTPUT_VERSION_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputVersion";
+
+ /** key used to define the name of the table to write to */
+ public static final String PROPERTY_OUTPUT_TABLE_NAME_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputTableName";
+
+ /** key used to define the column mapping of hbase to hcatalog schema */
+ public static final String PROPERTY_COLUMN_MAPPING_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+"."+ HBaseSerDe.HBASE_COLUMNS_MAPPING;
+
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java Fri Oct 7 17:15:28 2011
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.hbase.LazyHBaseRow;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of ResultConverter using HBaseSerDe
+ * mapping between HBase schema and HCatRecord schema is defined by
+ * {@link HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY}
+ */
+class HBaseSerDeResultConverter implements ResultConverter {
+ private HBaseSerDe serDe;
+ private HCatSchema schema;
+ private HCatSchema outputSchema;
+ private StructObjectInspector hCatRecordOI;
+ private StructObjectInspector lazyHBaseRowOI;
+ private final Long outputVersion;
+
+ /**
+ * @param schema table schema
+ * @param outputSchema schema of projected output
+ * @param hcatProperties table properties
+ * @throws IOException thrown if hive's HBaseSerDe couldn't be initialized
+ */
+ public HBaseSerDeResultConverter(HCatSchema schema,
+ HCatSchema outputSchema,
+ Properties hcatProperties) throws IOException {
+
+ hcatProperties.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ hcatProperties.getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY));
+
+ if(hcatProperties.containsKey(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY))
+ outputVersion = Long.parseLong(hcatProperties.getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY));
+ else
+ outputVersion = null;
+
+ this.schema = schema;
+ if(outputSchema == null) {
+ this.outputSchema = schema;
+ }
+ else {
+ this.outputSchema = outputSchema;
+ }
+
+ hCatRecordOI = createStructObjectInspector();
+ try {
+ serDe = new HBaseSerDe();
+ serDe.initialize(new Configuration(),hcatProperties);
+ lazyHBaseRowOI = (StructObjectInspector) serDe.getObjectInspector();
+ } catch (SerDeException e) {
+ throw new IOException("SerDe initialization failed",e);
+ }
+ }
+
+ @Override
+ public Put convert(HCatRecord record) throws IOException {
+ try {
+ //small hack to explicitly specify timestamp/version number to use
+ //since HBaseSerDe does not support specifying it
+ //will have to decide whether we will write our own or contribute code
+ //for the SerDe
+ Put put = (Put)serDe.serialize(record.getAll(),hCatRecordOI);
+ Put res;
+ if(outputVersion == null) {
+ res = put;
+ }
+ else {
+ res = new Put(put.getRow(),outputVersion.longValue());
+ for(List<KeyValue> row: put.getFamilyMap().values()) {
+ for(KeyValue el: row) {
+ res.add(el.getFamily(),el.getQualifier(),el.getValue());
+ }
+ }
+ }
+ return res;
+ } catch (SerDeException e) {
+ throw new IOException("serialization failed",e);
+ }
+ }
+
+ @Override
+ public HCatRecord convert(Result result) throws IOException {
+ // Deserialize bytesRefArray into struct and then convert that struct to
+ // HCatRecord.
+ LazyHBaseRow struct;
+ try {
+ struct = (LazyHBaseRow)serDe.deserialize(result);
+ } catch (SerDeException e) {
+ throw new IOException(e);
+ }
+
+ List<Object> outList = new ArrayList<Object>(outputSchema.size());
+
+ String colName;
+ Integer index;
+
+ for(HCatFieldSchema col : outputSchema.getFields()){
+
+ colName = col.getName().toLowerCase();
+ index = outputSchema.getPosition(colName);
+
+ if(index != null){
+ StructField field = lazyHBaseRowOI.getStructFieldRef(colName);
+ outList.add(getTypedObj(lazyHBaseRowOI.getStructFieldData(struct, field), field.getFieldObjectInspector()));
+ }
+ }
+ return new DefaultHCatRecord(outList);
+ }
+
+ private Object getTypedObj(Object data, ObjectInspector oi) throws IOException{
+ // The real work-horse method. We are gobbling up all the laziness benefits
+ // of Hive-LazyHBaseRow by deserializing everything and creating crisp HCatRecord
+ // with crisp Java objects inside it. We have to do it because higher layer
+ // may not know how to do it.
+ //TODO leverage laziness of SerDe
+ switch(oi.getCategory()){
+
+ case PRIMITIVE:
+ return ((PrimitiveObjectInspector)oi).getPrimitiveJavaObject(data);
+
+ case MAP:
+ MapObjectInspector moi = (MapObjectInspector)oi;
+ Map<?,?> lazyMap = moi.getMap(data);
+ ObjectInspector keyOI = moi.getMapKeyObjectInspector();
+ ObjectInspector valOI = moi.getMapValueObjectInspector();
+ Map<Object,Object> typedMap = new HashMap<Object,Object>(lazyMap.size());
+ for(Map.Entry<?,?> e : lazyMap.entrySet()){
+ typedMap.put(getTypedObj(e.getKey(), keyOI), getTypedObj(e.getValue(), valOI));
+ }
+ return typedMap;
+
+ case LIST:
+ ListObjectInspector loi = (ListObjectInspector)oi;
+ List<?> lazyList = loi.getList(data);
+ ObjectInspector elemOI = loi.getListElementObjectInspector();
+ List<Object> typedList = new ArrayList<Object>(lazyList.size());
+ Iterator<?> itr = lazyList.listIterator();
+ while(itr.hasNext()){
+ typedList.add(getTypedObj(itr.next(),elemOI));
+ }
+ return typedList;
+
+ case STRUCT:
+ StructObjectInspector soi = (StructObjectInspector)oi;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ List<Object> typedStruct = new ArrayList<Object>(fields.size());
+ for(StructField field : fields){
+ typedStruct.add( getTypedObj(soi.getStructFieldData(data, field), field.getFieldObjectInspector()));
+ }
+ return typedStruct;
+
+
+ default:
+ throw new IOException("Don't know how to deserialize: "+oi.getCategory());
+
+ }
+ }
+
+ private StructObjectInspector createStructObjectInspector() throws IOException {
+
+ if( outputSchema == null ) {
+ throw new IOException("Invalid output schema specified");
+ }
+
+ List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
+ List<String> fieldNames = new ArrayList<String>();
+
+ for(HCatFieldSchema hcatFieldSchema : outputSchema.getFields()) {
+ TypeInfo type = TypeInfoUtils.getTypeInfoFromTypeString(hcatFieldSchema.getTypeString());
+
+ fieldNames.add(hcatFieldSchema.getName());
+ fieldInspectors.add(getObjectInspector(type));
+ }
+
+ StructObjectInspector structInspector = ObjectInspectorFactory.
+ getStandardStructObjectInspector(fieldNames, fieldInspectors);
+ return structInspector;
+ }
+
+ private ObjectInspector getObjectInspector(TypeInfo type) throws IOException {
+
+ switch(type.getCategory()) {
+
+ case PRIMITIVE :
+ PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type;
+ return PrimitiveObjectInspectorFactory.
+ getPrimitiveJavaObjectInspector(primitiveType.getPrimitiveCategory());
+
+ case MAP :
+ MapTypeInfo mapType = (MapTypeInfo) type;
+ MapObjectInspector mapInspector = ObjectInspectorFactory.getStandardMapObjectInspector(
+ getObjectInspector(mapType.getMapKeyTypeInfo()), getObjectInspector(mapType.getMapValueTypeInfo()));
+ return mapInspector;
+
+ case LIST :
+ ListTypeInfo listType = (ListTypeInfo) type;
+ ListObjectInspector listInspector = ObjectInspectorFactory.getStandardListObjectInspector(
+ getObjectInspector(listType.getListElementTypeInfo()));
+ return listInspector;
+
+ case STRUCT :
+ StructTypeInfo structType = (StructTypeInfo) type;
+ List<TypeInfo> fieldTypes = structType.getAllStructFieldTypeInfos();
+
+ List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
+ for(TypeInfo fieldType : fieldTypes) {
+ fieldInspectors.add(getObjectInspector(fieldType));
+ }
+
+ StructObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+ structType.getAllStructFieldNames(), fieldInspectors);
+ return structInspector;
+
+ default :
+ throw new IOException("Unknown field schema type");
+ }
+ }
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java Fri Oct 7 17:15:28 2011
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+
+/**
+ * Interface used to define conversion of HCatRecord to and from Native HBase write (Put) and read (Result) objects.
+ * How the actual mapping is defined between an HBase Table's schema and an HCatalog Table's schema
+ * is up to the underlying implementation
+ */
+interface ResultConverter {
+
+ /**
+ * convert HCatRecord instance to an HBase Put, used when writing out data.
+ * @param record instance to convert
+ * @return converted Put instance
+ * @throws IOException
+ */
+ Put convert(HCatRecord record) throws IOException;
+
+ /**
+ * convert HBase Result to HCatRecord instance, used when reading data.
+ * @param result instance to convert
+ * @return converted Result instance
+ * @throws IOException
+ */
+ HCatRecord convert(Result result) throws IOException;
+
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/all-tests
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/all-tests?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/all-tests (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/all-tests Fri Oct 7 17:15:28 2011
@@ -0,0 +1 @@
+**/Test*.java
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/excluded-tests
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/excluded-tests?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/excluded-tests (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/excluded-tests Fri Oct 7 17:15:28 2011
@@ -0,0 +1 @@
+
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/log4j.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/log4j.xml?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/log4j.xml (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/log4j.xml Fri Oct 7 17:15:28 2011
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="ConsoleAppender" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.TTCCLayout"/>
+ </appender>
+
+
+ <root>
+ <priority value ="INFO" />
+ <appender-ref ref="ConsoleAppender" />
+ </root>
+ <!--<logger name="org.apache.hadoop.mapred">-->
+ <!--<level value="trace"/>-->
+ <!--</logger>-->
+ <!--<logger name="org.apache.hadoop.hbase">-->
+ <!--<level value="trace"/>-->
+ <!--</logger>-->
+</log4j:configuration>
\ No newline at end of file
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java Fri Oct 7 17:15:28 2011
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test HBaseSerdeResultConverter by manually creating records to convert to and from HBase objects
+ */
+public class TestHBaseSerDeResultConverter {
+
+ private Properties createProperties() {
+ Properties tbl = new Properties();
+ // Set the configuration parameters
+ tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+ tbl.setProperty("columns","key,aint,astring,amap");
+ tbl.setProperty("columns.types","string:int:string:map<string,int>");
+ tbl.setProperty(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+"."+ HBaseSerDe.HBASE_COLUMNS_MAPPING,
+ ":key,my_family:my_qualifier1,my_family:my_qualifier2,my_family2:");
+ tbl.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
+ tbl.setProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,"1");
+ return tbl;
+ }
+
+ private HCatSchema createHCatSchema() throws HCatException {
+ HCatSchema subSchema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+ subSchema.append(new HCatFieldSchema(null, HCatFieldSchema.Type.INT,""));
+
+ HCatSchema schema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+ schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING,""));
+ schema.append(new HCatFieldSchema("aint", HCatFieldSchema.Type.INT,""));
+ schema.append(new HCatFieldSchema("astring", HCatFieldSchema.Type.STRING,""));
+ schema.append(new HCatFieldSchema("amap", HCatFieldSchema.Type.MAP,HCatFieldSchema.Type.STRING,subSchema,""));
+ return schema;
+ }
+
+ @Test
+ public void testDeserialize() throws IOException {
+ HBaseSerDeResultConverter converter = new HBaseSerDeResultConverter(createHCatSchema(),
+ null,
+ createProperties());
+ //test integer
+ Result result = new Result(new KeyValue[]{new KeyValue(Bytes.toBytes("row"),
+ Bytes.toBytes("my_family"),
+ Bytes.toBytes("my_qualifier1"),
+ 0,
+ //This is how Hive's SerDe serializes numbers
+ Bytes.toBytes("123")),
+ //test string
+ new KeyValue(Bytes.toBytes("row"),
+ Bytes.toBytes("my_family"),
+ Bytes.toBytes("my_qualifier2"),
+ 0,
+ Bytes.toBytes("onetwothree")),
+ //test family map
+ new KeyValue(Bytes.toBytes("row"),
+ Bytes.toBytes("my_family2"),
+ Bytes.toBytes("one"),
+ 0,
+ Bytes.toBytes("1")),
+ new KeyValue(Bytes.toBytes("row"),
+ Bytes.toBytes("my_family2"),
+ Bytes.toBytes("two"),
+ 0,
+ Bytes.toBytes("2"))});
+
+ HCatRecord record = converter.convert(result);
+
+ assertEquals(Bytes.toString(result.getRow()), record.get(0).toString());
+ assertEquals(Integer.valueOf(
+ Bytes.toString(
+ result.getValue(Bytes.toBytes("my_family"), Bytes.toBytes("my_qualifier1")))),
+ record.get(1));
+ assertEquals(Bytes.toString(
+ result.getValue(Bytes.toBytes("my_family"), Bytes.toBytes("my_qualifier2"))),
+ record.get(2).toString());
+ Map<String,Integer> recordMap = (Map<String,Integer>)record.get(3);
+ Map<byte[],byte[]> familyMap = result.getFamilyMap(Bytes.toBytes("my_family2"));
+ assertEquals(Integer.valueOf(
+ Bytes.toString(
+ familyMap.get(Bytes.toBytes("one")))),
+ recordMap.get("one"));
+ assertEquals(Integer.valueOf(
+ Bytes.toString(
+ familyMap.get(Bytes.toBytes("two")))),
+ recordMap.get("two"));
+ }
+
+ @Test
+ public void testSerialize() throws IOException {
+ HCatSchema schema = createHCatSchema();
+ HBaseSerDeResultConverter converter = new HBaseSerDeResultConverter(schema,
+ null,
+ createProperties());
+ HCatRecord in = new DefaultHCatRecord(4);
+ //row key
+ in.set(0,"row");
+ //test integer
+ in.set(1,123);
+ //test string
+ in.set(2,"onetwothree");
+ //test map
+ Map<String,Integer> map = new HashMap<String,Integer>();
+ map.put("one",1);
+ map.put("two",2);
+ in.set(3,map);
+
+ Put put = converter.convert(in);
+
+ assertEquals(in.get(0).toString(),Bytes.toString(put.getRow()));
+ assertEquals(in.get(1),
+ Integer.valueOf(
+ Bytes.toString(
+ put.get(Bytes.toBytes("my_family"),
+ Bytes.toBytes("my_qualifier1")).get(0).getValue())));
+ assertEquals(1l,
+ put.get(Bytes.toBytes("my_family"),
+ Bytes.toBytes("my_qualifier1")).get(0).getTimestamp());
+ assertEquals(in.get(2),
+ Bytes.toString(
+ put.get(Bytes.toBytes("my_family"),
+ Bytes.toBytes("my_qualifier2")).get(0).getValue()));
+ assertEquals(1l,
+ put.get(Bytes.toBytes("my_family"),
+ Bytes.toBytes("my_qualifier2")).get(0).getTimestamp());
+ assertEquals(map.get("one"),
+ Integer.valueOf(
+ Bytes.toString(
+ put.get(Bytes.toBytes("my_family2"),
+ Bytes.toBytes("one")).get(0).getValue())));
+ assertEquals(1l,
+ put.get(Bytes.toBytes("my_family2"),
+ Bytes.toBytes("one")).get(0).getTimestamp());
+ assertEquals(map.get("two"),
+ Integer.valueOf(Bytes.toString(
+ put.get("my_family2".getBytes(),
+ "two".getBytes()).get(0).getValue())));
+ assertEquals(1l,
+ put.get(Bytes.toBytes("my_family2"),
+ Bytes.toBytes("two")).get(0).getTimestamp());
+ }
+}