You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by tr...@apache.org on 2012/08/17 19:51:07 UTC

svn commit: r1374409 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/mapreduce/

Author: travis
Date: Fri Aug 17 19:51:06 2012
New Revision: 1374409

URL: http://svn.apache.org/viewvc?rev=1374409&view=rev
Log:
HCATALOG-472 HCatalog should use Deserializer in the read path instead of SerDe

Added:
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1374409&r1=1374408&r2=1374409&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Fri Aug 17 19:51:06 2012
@@ -79,6 +79,8 @@ Trunk (unreleased changes)
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-472 HCatalog should use Deserializer in the read path instead of SerDe (traviscrawford)
+
   HCAT-467 Templeton scripts point to wrong jar name (jghoman via traviscrawford)
 
   HCAT-436 JSON SerDe column misnaming on CTAS (khorgath via gates)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1374409&r1=1374408&r2=1374409&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java Fri Aug 17 19:51:06 2012
@@ -20,7 +20,7 @@ package org.apache.hcatalog.mapreduce;
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -54,7 +54,7 @@ class HCatRecordReader extends RecordRea
     /** The storage handler used */
     private final HCatStorageHandler storageHandler;
 
-    private SerDe serde;
+    private Deserializer deserializer;
 
     private Map<String,String> valuesNotInDataCols;
 
@@ -82,7 +82,7 @@ class HCatRecordReader extends RecordRea
       HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
 
       baseRecordReader = createBaseRecordReader(hcatSplit, storageHandler, taskContext);
-      serde = createSerDe(hcatSplit, storageHandler, taskContext);
+      createDeserializer(hcatSplit, storageHandler, taskContext);
 
       // Pull the output schema out of the TaskAttemptContext
       outputSchema = (HCatSchema) HCatUtil.deserialize(
@@ -108,22 +108,20 @@ class HCatRecordReader extends RecordRea
           InternalUtil.createReporter(taskContext));
     }
 
-    private SerDe createSerDe(HCatSplit hcatSplit, HCatStorageHandler storageHandler,
+    private void createDeserializer(HCatSplit hcatSplit, HCatStorageHandler storageHandler,
         TaskAttemptContext taskContext) throws IOException {
 
-      SerDe serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),
+      deserializer = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),
           taskContext.getConfiguration());
 
       try {
-        InternalUtil.initializeInputSerDe(serde, storageHandler.getConf(),
+        InternalUtil.initializeDeserializer(deserializer, storageHandler.getConf(),
             hcatSplit.getPartitionInfo().getTableInfo(),
             hcatSplit.getPartitionInfo().getPartitionSchema());
       } catch (SerDeException e) {
-        throw new IOException("Failed initializing SerDe "
+        throw new IOException("Failed initializing deserializer "
             + storageHandler.getSerDeClass().getName(), e);
       }
-
-      return serde;
     }
 
   /* (non-Javadoc)
@@ -145,7 +143,7 @@ class HCatRecordReader extends RecordRea
 
       try {
 
-        r = new LazyHCatRecord(serde.deserialize(currentValue),serde.getObjectInspector());
+        r = new LazyHCatRecord(deserializer.deserialize(currentValue), deserializer.getObjectInspector());
         DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size());
         int i = 0;
         for (String fieldName : outputSchema.getFieldNames()){

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1374409&r1=1374408&r2=1374409&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java Fri Aug 17 19:51:06 2012
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -42,6 +43,8 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -50,13 +53,14 @@ import java.util.Map;
 import java.util.Properties;
 
 class InternalUtil {
+    private static final Logger LOG = LoggerFactory.getLogger(InternalUtil.class);
 
     static StorerInfo extractStorerInfo(StorageDescriptor sd, Map<String, String> properties) throws IOException {
         Properties hcatProperties = new Properties();
         for (String key : properties.keySet()){
             hcatProperties.put(key, properties.get(key));
         }
-        
+
         // also populate with StorageDescriptor->SerDe.Parameters
         for (Map.Entry<String, String>param :
             sd.getSerdeInfo().getParameters().entrySet()) {
@@ -132,28 +136,26 @@ class InternalUtil {
 
   //TODO this has to find a better home, it's also hardcoded as default in hive would be nice
   // if the default was decided by the serde
-  static void initializeOutputSerDe(SerDe serDe, Configuration conf, 
-                                    OutputJobInfo jobInfo) 
-  throws SerDeException {
-    initializeSerDe(serDe, conf, jobInfo.getTableInfo(), 
-                    jobInfo.getOutputSchema()); 
-  }
-
-  static void initializeInputSerDe(SerDe serDe, Configuration conf, 
-                                   HCatTableInfo info, HCatSchema s)
-  throws SerDeException {
-    initializeSerDe(serDe, conf, info, s); 
-  }
-
-  static void initializeSerDe(SerDe serDe, Configuration conf, 
-                              HCatTableInfo info, HCatSchema s)
-  throws SerDeException {
-     Properties props = new Properties();
+  static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo)
+      throws SerDeException {
+    serDe.initialize(conf, getSerdeProperties(jobInfo.getTableInfo(), jobInfo.getOutputSchema()));
+  }
+
+  static void initializeDeserializer(Deserializer deserializer, Configuration conf,
+      HCatTableInfo info, HCatSchema schema) throws SerDeException {
+    Properties props = getSerdeProperties(info, schema);
+    LOG.info("Initializing " + deserializer.getClass().getName() + " with properties " + props);
+    deserializer.initialize(conf, props);
+  }
+
+  private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s)
+      throws SerDeException {
+    Properties props = new Properties();
     List<FieldSchema> fields = HCatUtil.getFieldSchemaList(s.getFields());
     props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS,
-          MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
+        MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
     props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES,
-          MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
+        MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
 
     // setting these props to match LazySimpleSerde
     props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_NULL_FORMAT, "\\N");
@@ -162,7 +164,7 @@ class InternalUtil {
     //add props from params set in table schema
     props.putAll(info.getStorerInfo().getProperties());
 
-    serDe.initialize(conf,props);
+    return props;
   }
 
 static Reporter createReporter(TaskAttemptContext context) {

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java?rev=1374409&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java Fri Aug 17 19:51:06 2012
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.thrift.test.IntString;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Iterator;
+
+public class TestHCatHiveThriftCompatibility extends HCatBaseTest {
+
+  private boolean setUpComplete = false;
+  private Path intStringSeq;
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    if (setUpComplete) {
+      return;
+    }
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    TIOStreamTransport transport = new TIOStreamTransport(out);
+    TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+    IntString intString = new IntString(1, "one", 1);
+    intString.write(protocol);
+    BytesWritable bytesWritable = new BytesWritable(out.toByteArray());
+
+    intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq");
+    LOG.info("Creating data file: " + intStringSeq);
+
+    SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(
+        intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq,
+        NullWritable.class, BytesWritable.class);
+    seqFileWriter.append(NullWritable.get(), bytesWritable);
+    seqFileWriter.close();
+
+    setUpComplete = true;
+  }
+
+  /**
+   *  Create a table with no explicit schema and ensure its correctly
+   *  discovered from the thrift struct.
+   */
+  @Test
+  public void testDynamicCols() throws Exception {
+    Assert.assertEquals(0, driver.run("drop table if exists test_thrift").getResponseCode());
+    Assert.assertEquals(0, driver.run(
+        "create external table test_thrift " +
+            "partitioned by (year string) " +
+            "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " +
+            "with serdeproperties ( " +
+            "  'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " +
+            "  'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " +
+            "stored as" +
+            "  inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" +
+            "  outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'")
+        .getResponseCode());
+    Assert.assertEquals(0,
+        driver.run("alter table test_thrift add partition (year = '2012') location '" +
+            intStringSeq.getParent() + "'").getResponseCode());
+
+    PigServer pigServer = new PigServer(ExecType.LOCAL);
+    pigServer.registerQuery("A = load 'test_thrift' using org.apache.hcatalog.pig.HCatLoader();");
+
+    Schema expectedSchema = new Schema();
+    expectedSchema.add(new Schema.FieldSchema("myint", DataType.INTEGER));
+    expectedSchema.add(new Schema.FieldSchema("mystring", DataType.CHARARRAY));
+    expectedSchema.add(new Schema.FieldSchema("underscore_int", DataType.INTEGER));
+    expectedSchema.add(new Schema.FieldSchema("year", DataType.CHARARRAY));
+
+    Assert.assertEquals(expectedSchema, pigServer.dumpSchema("A"));
+
+    Iterator<Tuple> iterator = pigServer.openIterator("A");
+    Tuple t = iterator.next();
+    Assert.assertEquals(1, t.get(0));
+    Assert.assertEquals("one", t.get(1));
+    Assert.assertEquals(1, t.get(2));
+    Assert.assertEquals("2012", t.get(3));
+
+    Assert.assertFalse(iterator.hasNext());
+  }
+}
\ No newline at end of file