You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2011/10/07 17:15:28 UTC

svn commit: r1180106 - in /incubator/hcatalog/trunk: ./ storage-drivers/ storage-drivers/hbase/src/ storage-drivers/hbase/src/java/ storage-drivers/hbase/src/java/org/ storage-drivers/hbase/src/java/org/apache/ storage-drivers/hbase/src/java/org/apache...

Author: khorgath
Date: Fri Oct  7 17:15:28 2011
New Revision: 1180106

URL: http://svn.apache.org/viewvc?rev=1180106&view=rev
Log:
HCATALOG-74 ResultConverter for HBase Storage Drivers

Added:
    incubator/hcatalog/trunk/storage-drivers/hbase/src/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/all-tests
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/excluded-tests
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/log4j.xml
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/storage-drivers/build.xml

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1180106&r1=1180105&r2=1180106&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Fri Oct  7 17:15:28 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-74. ResultConverter for HBase Storage Drivers (avandana via khorgath)
+
   HCAT-89. Support for creating non-native tables (avandana via gates)
 
   HCAT-60. Refactor HCatalog to support non-filebased outputformats (toffer via gates)

Modified: incubator/hcatalog/trunk/storage-drivers/build.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/build.xml?rev=1180106&r1=1180105&r2=1180106&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/build.xml (original)
+++ incubator/hcatalog/trunk/storage-drivers/build.xml Fri Oct  7 17:15:28 2011
@@ -25,7 +25,7 @@
         </condition>
         <!-- add storage drivers here -->
         <echo>Executing storage-driver &quot;${target}&quot; for hbase</echo>
-        <!-- ant target="${target}" dir="hbase" inheritAll="false" useNativeBasedir="true"/ -->
+        <ant target="${target}" dir="hbase" inheritAll="false" useNativeBasedir="true"/>
     </target>
 
 

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java Fri Oct  7 17:15:28 2011
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hcatalog.common.HCatConstants;
+
+/**
+ * Constants class for constants used in the HBase Storage Drivers module
+ */
+class HBaseConstants {
+
+    /** key used to define th version number HBaseOutputStorage driver to use when writing out data for a job */
+    public static final String PROPERTY_OUTPUT_VERSION_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputVersion";
+
+    /** key used to define the name of the table to write to */
+    public static final String PROPERTY_OUTPUT_TABLE_NAME_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.mapreduce.outputTableName";
+
+    /** key used to define the column mapping of hbase to hcatalog schema */
+    public static final String PROPERTY_COLUMN_MAPPING_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+"."+ HBaseSerDe.HBASE_COLUMNS_MAPPING;
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java Fri Oct  7 17:15:28 2011
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.hbase.LazyHBaseRow;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of ResultConverter using HBaseSerDe
+ * mapping between HBase schema and HCatRecord schema is defined by
+ * {@link HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY}
+ */
+class HBaseSerDeResultConverter implements  ResultConverter {
+    private HBaseSerDe serDe;
+    private HCatSchema schema;
+    private HCatSchema outputSchema;
+    private StructObjectInspector hCatRecordOI;
+    private StructObjectInspector lazyHBaseRowOI;
+    private final Long outputVersion;
+
+    /**
+     * @param schema table schema
+     * @param outputSchema schema of projected output
+     * @param hcatProperties table properties
+     * @throws IOException thrown if hive's HBaseSerDe couldn't be initialized
+     */
+    public HBaseSerDeResultConverter(HCatSchema schema,
+                                     HCatSchema outputSchema,
+                                     Properties hcatProperties) throws IOException {
+
+        hcatProperties.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
+                hcatProperties.getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY));
+
+        if(hcatProperties.containsKey(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY))
+            outputVersion = Long.parseLong(hcatProperties.getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY));
+        else
+            outputVersion = null;
+
+        this.schema = schema;
+        if(outputSchema == null) {
+            this.outputSchema = schema;
+        }
+        else {
+            this.outputSchema = outputSchema;
+        }
+
+        hCatRecordOI = createStructObjectInspector();
+        try {
+            serDe = new HBaseSerDe();
+            serDe.initialize(new Configuration(),hcatProperties);
+            lazyHBaseRowOI = (StructObjectInspector) serDe.getObjectInspector();
+        } catch (SerDeException e) {
+            throw new IOException("SerDe initialization failed",e);
+        }
+    }
+
+    @Override
+    public Put convert(HCatRecord record) throws IOException {
+        try {
+            //small hack to explicitly specify timestamp/version number to use
+            //since HBaseSerDe does not support specifying it
+            //will have to decide whether we will write our own or contribute code
+            //for the SerDe
+            Put put = (Put)serDe.serialize(record.getAll(),hCatRecordOI);
+            Put res;
+            if(outputVersion == null) {
+                res = put;
+            }
+            else {
+                res = new Put(put.getRow(),outputVersion.longValue());
+                for(List<KeyValue> row: put.getFamilyMap().values()) {
+                    for(KeyValue el: row) {
+                        res.add(el.getFamily(),el.getQualifier(),el.getValue());
+                    }
+                }
+            }
+            return res;
+        } catch (SerDeException e) {
+            throw new IOException("serialization failed",e);
+        }
+    }
+
+    @Override
+    public HCatRecord convert(Result result) throws IOException {
+        // Deserialize bytesRefArray into struct and then convert that struct to
+        // HCatRecord.
+        LazyHBaseRow struct;
+        try {
+            struct = (LazyHBaseRow)serDe.deserialize(result);
+        } catch (SerDeException e) {
+            throw new IOException(e);
+        }
+
+        List<Object> outList = new ArrayList<Object>(outputSchema.size());
+
+        String colName;
+        Integer index;
+
+        for(HCatFieldSchema col : outputSchema.getFields()){
+
+            colName = col.getName().toLowerCase();
+            index = outputSchema.getPosition(colName);
+
+            if(index != null){
+                StructField field = lazyHBaseRowOI.getStructFieldRef(colName);
+                outList.add(getTypedObj(lazyHBaseRowOI.getStructFieldData(struct, field), field.getFieldObjectInspector()));
+            }
+        }
+        return new DefaultHCatRecord(outList);
+    }
+
+    private Object getTypedObj(Object data, ObjectInspector oi) throws IOException{
+        // The real work-horse method. We are gobbling up all the laziness benefits
+        // of Hive-LazyHBaseRow by deserializing everything and creating crisp  HCatRecord
+        // with crisp Java objects inside it. We have to do it because higher layer
+        // may not know how to do it.
+        //TODO leverage laziness of SerDe
+        switch(oi.getCategory()){
+
+            case PRIMITIVE:
+                return ((PrimitiveObjectInspector)oi).getPrimitiveJavaObject(data);
+
+            case MAP:
+                MapObjectInspector moi = (MapObjectInspector)oi;
+                Map<?,?> lazyMap = moi.getMap(data);
+                ObjectInspector keyOI = moi.getMapKeyObjectInspector();
+                ObjectInspector valOI = moi.getMapValueObjectInspector();
+                Map<Object,Object> typedMap = new HashMap<Object,Object>(lazyMap.size());
+                for(Map.Entry<?,?> e : lazyMap.entrySet()){
+                    typedMap.put(getTypedObj(e.getKey(), keyOI), getTypedObj(e.getValue(), valOI));
+                }
+                return typedMap;
+
+            case LIST:
+                ListObjectInspector loi = (ListObjectInspector)oi;
+                List<?> lazyList = loi.getList(data);
+                ObjectInspector elemOI = loi.getListElementObjectInspector();
+                List<Object> typedList = new ArrayList<Object>(lazyList.size());
+                Iterator<?> itr = lazyList.listIterator();
+                while(itr.hasNext()){
+                    typedList.add(getTypedObj(itr.next(),elemOI));
+                }
+                return typedList;
+
+            case STRUCT:
+                StructObjectInspector soi = (StructObjectInspector)oi;
+                List<? extends StructField> fields = soi.getAllStructFieldRefs();
+                List<Object> typedStruct = new ArrayList<Object>(fields.size());
+                for(StructField field : fields){
+                    typedStruct.add( getTypedObj(soi.getStructFieldData(data, field), field.getFieldObjectInspector()));
+                }
+                return typedStruct;
+
+
+            default:
+                throw new IOException("Don't know how to deserialize: "+oi.getCategory());
+
+        }
+    }
+
+    private StructObjectInspector createStructObjectInspector() throws IOException {
+
+        if( outputSchema == null ) {
+            throw new IOException("Invalid output schema specified");
+        }
+
+        List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
+        List<String> fieldNames = new ArrayList<String>();
+
+        for(HCatFieldSchema hcatFieldSchema : outputSchema.getFields()) {
+            TypeInfo type = TypeInfoUtils.getTypeInfoFromTypeString(hcatFieldSchema.getTypeString());
+
+            fieldNames.add(hcatFieldSchema.getName());
+            fieldInspectors.add(getObjectInspector(type));
+        }
+
+        StructObjectInspector structInspector = ObjectInspectorFactory.
+                getStandardStructObjectInspector(fieldNames, fieldInspectors);
+        return structInspector;
+    }
+
+    private ObjectInspector getObjectInspector(TypeInfo type) throws IOException {
+
+        switch(type.getCategory()) {
+
+            case PRIMITIVE :
+                PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type;
+                return PrimitiveObjectInspectorFactory.
+                        getPrimitiveJavaObjectInspector(primitiveType.getPrimitiveCategory());
+
+            case MAP :
+                MapTypeInfo mapType = (MapTypeInfo) type;
+                MapObjectInspector mapInspector = ObjectInspectorFactory.getStandardMapObjectInspector(
+                        getObjectInspector(mapType.getMapKeyTypeInfo()), getObjectInspector(mapType.getMapValueTypeInfo()));
+                return mapInspector;
+
+            case LIST :
+                ListTypeInfo listType = (ListTypeInfo) type;
+                ListObjectInspector listInspector = ObjectInspectorFactory.getStandardListObjectInspector(
+                        getObjectInspector(listType.getListElementTypeInfo()));
+                return listInspector;
+
+            case STRUCT :
+                StructTypeInfo structType = (StructTypeInfo) type;
+                List<TypeInfo> fieldTypes = structType.getAllStructFieldTypeInfos();
+
+                List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
+                for(TypeInfo fieldType : fieldTypes) {
+                    fieldInspectors.add(getObjectInspector(fieldType));
+                }
+
+                StructObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+                        structType.getAllStructFieldNames(), fieldInspectors);
+                return structInspector;
+
+            default :
+                throw new IOException("Unknown field schema type");
+        }
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java Fri Oct  7 17:15:28 2011
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+
+/**
+ * Interface used to define conversion of HCatRecord to and from Native HBase write (Put) and read (Result) objects.
+ * How the actual mapping is defined between an HBase Table's schema and an HCatalog Table's schema
+ * is up to the underlying implementation
+ */
+interface ResultConverter {
+
+    /**
+     * convert HCatRecord instance to an HBase Put, used when writing out data.
+     * @param record instance to convert
+     * @return converted Put instance
+     * @throws IOException
+     */
+    Put convert(HCatRecord record) throws IOException;
+
+    /**
+     * convert HBase Result to HCatRecord instance, used when reading data.
+     * @param result instance to convert
+     * @return converted Result instance
+     * @throws IOException
+     */
+    HCatRecord convert(Result result) throws IOException;
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/all-tests
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/all-tests?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/all-tests (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/all-tests Fri Oct  7 17:15:28 2011
@@ -0,0 +1 @@
+**/Test*.java

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/excluded-tests
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/excluded-tests?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/excluded-tests (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/excluded-tests Fri Oct  7 17:15:28 2011
@@ -0,0 +1 @@
+

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/log4j.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/log4j.xml?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/log4j.xml (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/log4j.xml Fri Oct  7 17:15:28 2011
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+    <appender name="ConsoleAppender" class="org.apache.log4j.ConsoleAppender">
+        <layout class="org.apache.log4j.TTCCLayout"/>
+    </appender>
+
+
+    <root>
+        <priority value ="INFO" />
+        <appender-ref ref="ConsoleAppender" />
+    </root>
+    <!--<logger name="org.apache.hadoop.mapred">-->
+    <!--<level value="trace"/>-->
+    <!--</logger>-->
+    <!--<logger name="org.apache.hadoop.hbase">-->
+    <!--<level value="trace"/>-->
+    <!--</logger>-->
+</log4j:configuration>
\ No newline at end of file

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java?rev=1180106&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java Fri Oct  7 17:15:28 2011
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test HBaseSerdeResultConverter by manually creating records to convert to and from HBase objects
+ */
+public class TestHBaseSerDeResultConverter {
+
+    private Properties createProperties() {
+        Properties tbl = new Properties();
+        // Set the configuration parameters
+        tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+        tbl.setProperty("columns","key,aint,astring,amap");
+        tbl.setProperty("columns.types","string:int:string:map<string,int>");
+        tbl.setProperty(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+"."+ HBaseSerDe.HBASE_COLUMNS_MAPPING,
+                ":key,my_family:my_qualifier1,my_family:my_qualifier2,my_family2:");
+        tbl.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
+        tbl.setProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,"1");
+        return tbl;
+    }
+
+    private HCatSchema createHCatSchema() throws HCatException {
+        HCatSchema subSchema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+        subSchema.append(new HCatFieldSchema(null, HCatFieldSchema.Type.INT,""));
+
+        HCatSchema schema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+        schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING,""));
+        schema.append(new HCatFieldSchema("aint", HCatFieldSchema.Type.INT,""));
+        schema.append(new HCatFieldSchema("astring", HCatFieldSchema.Type.STRING,""));
+        schema.append(new HCatFieldSchema("amap", HCatFieldSchema.Type.MAP,HCatFieldSchema.Type.STRING,subSchema,""));
+        return schema;
+    }
+
+    @Test
+    public void testDeserialize() throws IOException {
+        HBaseSerDeResultConverter converter = new HBaseSerDeResultConverter(createHCatSchema(),
+                null,
+                createProperties());
+        //test integer
+        Result result = new Result(new KeyValue[]{new KeyValue(Bytes.toBytes("row"),
+                Bytes.toBytes("my_family"),
+                Bytes.toBytes("my_qualifier1"),
+                0,
+                //This is how Hive's SerDe serializes numbers
+                Bytes.toBytes("123")),
+                //test string
+                new KeyValue(Bytes.toBytes("row"),
+                        Bytes.toBytes("my_family"),
+                        Bytes.toBytes("my_qualifier2"),
+                        0,
+                        Bytes.toBytes("onetwothree")),
+                //test family map
+                new KeyValue(Bytes.toBytes("row"),
+                        Bytes.toBytes("my_family2"),
+                        Bytes.toBytes("one"),
+                        0,
+                        Bytes.toBytes("1")),
+                new KeyValue(Bytes.toBytes("row"),
+                        Bytes.toBytes("my_family2"),
+                        Bytes.toBytes("two"),
+                        0,
+                        Bytes.toBytes("2"))});
+
+        HCatRecord record = converter.convert(result);
+
+        assertEquals(Bytes.toString(result.getRow()), record.get(0).toString());
+        assertEquals(Integer.valueOf(
+                Bytes.toString(
+                        result.getValue(Bytes.toBytes("my_family"), Bytes.toBytes("my_qualifier1")))),
+                record.get(1));
+        assertEquals(Bytes.toString(
+                result.getValue(Bytes.toBytes("my_family"), Bytes.toBytes("my_qualifier2"))),
+                record.get(2).toString());
+        Map<String,Integer> recordMap = (Map<String,Integer>)record.get(3);
+        Map<byte[],byte[]> familyMap = result.getFamilyMap(Bytes.toBytes("my_family2"));
+        assertEquals(Integer.valueOf(
+                Bytes.toString(
+                        familyMap.get(Bytes.toBytes("one")))),
+                recordMap.get("one"));
+        assertEquals(Integer.valueOf(
+                Bytes.toString(
+                        familyMap.get(Bytes.toBytes("two")))),
+                recordMap.get("two"));
+    }
+
+    @Test
+    public void testSerialize() throws IOException {
+        HCatSchema schema = createHCatSchema();
+        HBaseSerDeResultConverter converter = new HBaseSerDeResultConverter(schema,
+                null,
+                createProperties());
+        HCatRecord in = new DefaultHCatRecord(4);
+        //row key
+        in.set(0,"row");
+        //test integer
+        in.set(1,123);
+        //test string
+        in.set(2,"onetwothree");
+        //test map
+        Map<String,Integer>  map = new HashMap<String,Integer>();
+        map.put("one",1);
+        map.put("two",2);
+        in.set(3,map);
+
+        Put put = converter.convert(in);
+
+        assertEquals(in.get(0).toString(),Bytes.toString(put.getRow()));
+        assertEquals(in.get(1),
+                Integer.valueOf(
+                        Bytes.toString(
+                                put.get(Bytes.toBytes("my_family"),
+                                        Bytes.toBytes("my_qualifier1")).get(0).getValue())));
+        assertEquals(1l,
+                put.get(Bytes.toBytes("my_family"),
+                        Bytes.toBytes("my_qualifier1")).get(0).getTimestamp());
+        assertEquals(in.get(2),
+                Bytes.toString(
+                        put.get(Bytes.toBytes("my_family"),
+                                Bytes.toBytes("my_qualifier2")).get(0).getValue()));
+        assertEquals(1l,
+                put.get(Bytes.toBytes("my_family"),
+                        Bytes.toBytes("my_qualifier2")).get(0).getTimestamp());
+        assertEquals(map.get("one"),
+                Integer.valueOf(
+                        Bytes.toString(
+                                put.get(Bytes.toBytes("my_family2"),
+                                        Bytes.toBytes("one")).get(0).getValue())));
+        assertEquals(1l,
+                put.get(Bytes.toBytes("my_family2"),
+                        Bytes.toBytes("one")).get(0).getTimestamp());
+        assertEquals(map.get("two"),
+                Integer.valueOf(Bytes.toString(
+                        put.get("my_family2".getBytes(),
+                                "two".getBytes()).get(0).getValue())));
+        assertEquals(1l,
+                put.get(Bytes.toBytes("my_family2"),
+                        Bytes.toBytes("two")).get(0).getTimestamp());
+    }
+}