You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ga...@apache.org on 2012/03/15 00:30:21 UTC

svn commit: r1300797 - in /incubator/hcatalog/branches/branch-0.4: ./ src/java/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/mapreduce/

Author: gates
Date: Thu Mar 15 00:30:20 2012
New Revision: 1300797

URL: http://svn.apache.org/viewvc?rev=1300797&view=rev
Log:
HCATALOG-291 Pig and MR fail to write to a sequence file

Added:
    incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java
Modified:
    incubator/hcatalog/branches/branch-0.4/CHANGES.txt
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java

Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1300797&r1=1300796&r2=1300797&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Thu Mar 15 00:30:20 2012
@@ -70,6 +70,8 @@ Release 0.4.0 - Unreleased
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-291 Pig and MR fail to write to a sequence file (avandana via gates)
+
   HCAT-305 hcat shell is not properly picking up the hcat jar (gates)
 
   HCAT-307 NOTICE and LICENSE file need updated to add new jars and move copyright date to 2012 (gates)

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java?rev=1300797&r1=1300796&r2=1300797&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java Thu Mar 15 00:30:20 2012
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileOutputFormat;
@@ -37,7 +39,9 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
@@ -73,6 +77,23 @@ class FileOutputFormatContainer extends 
         //this needs to be manually set, under normal circumstances MR Task does this
         setWorkOutputPath(context);
 
+        //Configure the output key and value classes.
+        // This is required for writing null as key for file based tables.
+        context.getConfiguration().set("mapred.output.key.class",
+                NullWritable.class.getName());
+        String jobInfoString = context.getConfiguration().get(
+                HCatConstants.HCAT_KEY_OUTPUT_INFO);
+        OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil
+                .deserialize(jobInfoString);
+        StorerInfo storeInfo = jobInfo.getTableInfo().getStorerInfo();
+        HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
+                context.getConfiguration(), storeInfo);
+        Class<? extends SerDe> serde = storageHandler.getSerDeClass();
+        SerDe sd = (SerDe) ReflectionUtils.newInstance(serde,
+                context.getConfiguration());
+        context.getConfiguration().set("mapred.output.value.class",
+                sd.getSerializedClass().getName());
+
         // When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. Can use null.
         // (That's because records can't be written until the values of the dynamic partitions are deduced.
         // By that time, a new local instance of RecordWriter, with the correct output-path, will be constructed.)
@@ -103,7 +124,7 @@ class FileOutputFormatContainer extends 
         } catch (TException e) {
             throw new IOException(e);
         } catch (NoSuchObjectException e) {
-            throw new IOException(e);        	
+            throw new IOException(e);
         }
 
         if(!jobInfo.isDynamicPartitioningUsed()) {

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1300797&r1=1300796&r2=1300797&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java Thu Mar 15 00:30:20 2012
@@ -18,11 +18,18 @@
 
 package org.apache.hcatalog.mapreduce;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.HCatMapRedUtil;
@@ -38,12 +45,6 @@ import org.apache.hcatalog.common.HCatEx
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Part of the FileOutput*Container classes
  * See {@link FileOutputFormatContainer} for more information
@@ -246,7 +247,7 @@ class FileRecordWriterContainer extends 
 
         //The key given by user is ignored
         try {
-            localWriter.write(null, localSerDe.serialize(value.getAll(), localObjectInspector));
+            localWriter.write(NullWritable.get(), localSerDe.serialize(value.getAll(), localObjectInspector));
         } catch (SerDeException e) {
             throw new IOException("Failed to serialize object",e);
         }

Added: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java?rev=1300797&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java Thu Mar 15 00:30:20 2012
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+import org.junit.Test;
+
+public class TestSequenceFileReadWrite {
+
+    private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static Driver driver;
+    private static Properties props;
+    private static PigServer server;
+    private static final String basicFile = "/tmp/basic.input.data";
+    private static String fullFileNameBasic;
+    private static String[] input;
+    private static HiveConf hiveConf;
+
+    public void Initialize() throws Exception {
+        hiveConf = new HiveConf(this.getClass());
+        hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
+                "false");
+        driver = new Driver(hiveConf);
+        SessionState.start(new CliSessionState(hiveConf));
+        props = new Properties();
+        props.setProperty("fs.default.name", cluster.getProperties()
+                .getProperty("fs.default.name"));
+        fullFileNameBasic = cluster.getProperties().getProperty(
+                "fs.default.name")
+                + basicFile;
+
+        int numRows = 3;
+        input = new String[numRows];
+        for (int i = 0; i < numRows; i++) {
+            String col1 = "a" + i;
+            String col2 = "b" + i;
+            input[i] = i + "," + col1 + "," + col2;
+        }
+        MiniCluster.deleteFile(cluster, basicFile);
+        MiniCluster.createInputFile(cluster, basicFile, input);
+        server = new PigServer(ExecType.LOCAL, props);
+    }
+
+    @Test
+   public void testSequenceTableWriteRead() throws Exception{
+        Initialize();
+        String createTable = "CREATE TABLE demo_table(a0 int, a1 String, a2 String) STORED AS SEQUENCEFILE";
+        driver.run("drop table demo_table");
+        int retCode1 = driver.run(createTable).getResponseCode();
+        assertTrue(retCode1 == 0);
+
+        UDFContext.getUDFContext().setClientSystemProps();
+        server.setBatchOn();
+        server.registerQuery("A = load '"
+                + fullFileNameBasic
+                + "' using PigStorage(',') as (a0:int,a1:chararray,a2:chararray);");
+        server.registerQuery("store A into 'demo_table' using org.apache.hcatalog.pig.HCatStorer();");
+        server.executeBatch();
+
+        server.registerQuery("B = load 'demo_table' using org.apache.hcatalog.pig.HCatLoader();");
+        Iterator<Tuple> XIter = server.openIterator("B");
+        int numTuplesRead = 0;
+        while (XIter.hasNext()) {
+            Tuple t = XIter.next();
+            assertEquals(3, t.size());
+            assertEquals(t.get(0).toString(), "" + numTuplesRead);
+            assertEquals(t.get(1).toString(), "a" + numTuplesRead);
+            assertEquals(t.get(2).toString(), "b" + numTuplesRead);
+            numTuplesRead++;
+        }
+        assertEquals(input.length, numTuplesRead);
+   }
+
+    @Test
+    public void testTextTableWriteRead() throws Exception{
+        Initialize();
+        String createTable = "CREATE TABLE demo_table_1(a0 int, a1 String, a2 String) STORED AS TEXTFILE";
+        driver.run("drop table demo_table_1");
+        int retCode1 = driver.run(createTable).getResponseCode();
+        assertTrue(retCode1 == 0);
+
+        UDFContext.getUDFContext().setClientSystemProps();
+        server.setBatchOn();
+        server.registerQuery("A = load '"
+                + fullFileNameBasic
+                + "' using PigStorage(',') as (a0:int,a1:chararray,a2:chararray);");
+        server.registerQuery("store A into 'demo_table_1' using org.apache.hcatalog.pig.HCatStorer();");
+        server.executeBatch();
+
+        server.registerQuery("B = load 'demo_table_1' using org.apache.hcatalog.pig.HCatLoader();");
+        Iterator<Tuple> XIter = server.openIterator("B");
+        int numTuplesRead = 0;
+        while (XIter.hasNext()) {
+            Tuple t = XIter.next();
+            assertEquals(3, t.size());
+            assertEquals(t.get(0).toString(), "" + numTuplesRead);
+            assertEquals(t.get(1).toString(), "a" + numTuplesRead);
+            assertEquals(t.get(2).toString(), "b" + numTuplesRead);
+            numTuplesRead++;
+        }
+        assertEquals(input.length, numTuplesRead);
+    }
+
+    @Test
+    public void testSequenceTableWriteReadMR() throws Exception{
+        Initialize();
+        String createTable = "CREATE TABLE demo_table_2(a0 int, a1 String, a2 String) STORED AS SEQUENCEFILE";
+        driver.run("drop table demo_table_2");
+        int retCode1 = driver.run(createTable).getResponseCode();
+        assertTrue(retCode1 == 0);
+
+        Configuration conf = new Configuration();
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+                HCatUtil.serialize(hiveConf.getAllProperties()));
+        Job job = new Job(conf, "Write-hcat-seq-table");
+        job.setJarByClass(TestSequenceFileReadWrite.class);
+
+        job.setMapperClass(Map.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(DefaultHCatRecord.class);
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, this.fullFileNameBasic);
+
+        HCatOutputFormat.setOutput(job, OutputJobInfo.create(
+                MetaStoreUtils.DEFAULT_DATABASE_NAME, "demo_table_2", null));
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        HCatOutputFormat.setSchema(job, getSchema());
+        job.setNumReduceTasks(0);
+        assertTrue(job.waitForCompletion(true));
+        new FileOutputCommitterContainer(job, null).cleanupJob(job);
+        assertTrue(job.isSuccessful());
+
+        UDFContext.getUDFContext().setClientSystemProps();
+        server.setBatchOn();
+        server.registerQuery("C = load 'default.demo_table_2' using org.apache.hcatalog.pig.HCatLoader();");
+        server.executeBatch();
+        Iterator<Tuple> XIter = server.openIterator("C");
+        int numTuplesRead = 0;
+        while (XIter.hasNext()) {
+            Tuple t = XIter.next();
+            assertEquals(3, t.size());
+            assertEquals(t.get(0).toString(), "" + numTuplesRead);
+            assertEquals(t.get(1).toString(), "a" + numTuplesRead);
+            assertEquals(t.get(2).toString(), "b" + numTuplesRead);
+            numTuplesRead++;
+        }
+        assertEquals(input.length, numTuplesRead);
+    }
+
+    @Test
+    public void testTextTableWriteReadMR() throws Exception {
+        Initialize();
+        String createTable = "CREATE TABLE demo_table_3(a0 int, a1 String, a2 String) STORED AS TEXTFILE";
+        driver.run("drop table demo_table_3");
+        int retCode1 = driver.run(createTable).getResponseCode();
+        assertTrue(retCode1 == 0);
+
+        Configuration conf = new Configuration();
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+                HCatUtil.serialize(hiveConf.getAllProperties()));
+        Job job = new Job(conf, "Write-hcat-text-table");
+        job.setJarByClass(TestSequenceFileReadWrite.class);
+
+        job.setMapperClass(Map.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(DefaultHCatRecord.class);
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, this.fullFileNameBasic);
+
+        HCatOutputFormat.setOutput(job, OutputJobInfo.create(
+                MetaStoreUtils.DEFAULT_DATABASE_NAME, "demo_table_3", null));
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        HCatOutputFormat.setSchema(job, getSchema());
+        assertTrue(job.waitForCompletion(true));
+        new FileOutputCommitterContainer(job, null).cleanupJob(job);
+        assertTrue(job.isSuccessful());
+
+        UDFContext.getUDFContext().setClientSystemProps();
+        server.setBatchOn();
+        server.registerQuery("D = load 'default.demo_table_3' using org.apache.hcatalog.pig.HCatLoader();");
+        server.executeBatch();
+        Iterator<Tuple> XIter = server.openIterator("D");
+        int numTuplesRead = 0;
+        while (XIter.hasNext()) {
+            Tuple t = XIter.next();
+            assertEquals(3, t.size());
+            assertEquals(t.get(0).toString(), "" + numTuplesRead);
+            assertEquals(t.get(1).toString(), "a" + numTuplesRead);
+            assertEquals(t.get(2).toString(), "b" + numTuplesRead);
+            numTuplesRead++;
+        }
+        assertEquals(input.length, numTuplesRead);
+    }
+
+
+  public static class Map extends Mapper<LongWritable, Text, NullWritable, DefaultHCatRecord>{
+
+      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+          String[] cols = value.toString().split(",");
+          DefaultHCatRecord record = new DefaultHCatRecord(3);
+          record.set(0,Integer.parseInt(cols[0]));
+          record.set(1,cols[1]);
+          record.set(2,cols[2]);
+          context.write(NullWritable.get(), record);
+      }
+    }
+
+  private HCatSchema getSchema() throws HCatException {
+      HCatSchema schema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+      schema.append(new HCatFieldSchema("a0", HCatFieldSchema.Type.INT,
+              ""));
+      schema.append(new HCatFieldSchema("a1",
+              HCatFieldSchema.Type.STRING, ""));
+      schema.append(new HCatFieldSchema("a2",
+              HCatFieldSchema.Type.STRING, ""));
+      return schema;
+  }
+
+}
\ No newline at end of file