You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC

svn commit: r1520466 [11/18] - in /hive/trunk/hcatalog: core/src/main/java/org/apache/hcatalog/cli/ core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/ core/src/main/java/org/apache/hcatalog/common/ core/src/main/java/org/apache/hcatalog/data/...

Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java.broken
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java.broken?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java.broken (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java.broken Fri Sep  6 00:49:14 2013
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+
+/**
+ *
+ * TestHCatEximOutputFormat. Some basic testing here. More testing done via
+ * TestHCatEximInputFormat
+ *
+ */
+public class TestHCatEximOutputFormat extends TestCase {
+
+  public static class TestMap extends
+      Mapper<LongWritable, Text, LongWritable, HCatRecord> {
+
+    private HCatSchema recordSchema;
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      super.setup(context);
+      recordSchema = HCatEximOutputFormat.getTableSchema(context);
+      System.out.println("TestMap/setup called");
+    }
+
+    @Override
+    public void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      String[] cols = value.toString().split(",");
+      HCatRecord record = new DefaultHCatRecord(recordSchema.size());
+      System.out.println("TestMap/map called. Cols[0]:" + cols[0]);
+      System.out.println("TestMap/map called. Cols[1]:" + cols[1]);
+      System.out.println("TestMap/map called. Cols[2]:" + cols[2]);
+      System.out.println("TestMap/map called. Cols[3]:" + cols[3]);
+      record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0]));
+      record.setString("emp_name", recordSchema, cols[1]);
+      record.setString("emp_dob", recordSchema, cols[2]);
+      record.setString("emp_sex", recordSchema, cols[3]);
+      context.write(key, record);
+    }
+  }
+
+
+  private static final String dbName = "hcatEximOutputFormatTestDB";
+  private static final String tblName = "hcatEximOutputFormatTestTable";
+  Configuration conf;
+  Job job;
+  List<HCatFieldSchema> columns;
+  HCatSchema schema;
+  FileSystem fs;
+  Path outputLocation;
+  Path dataLocation;
+
+  public void testNonPart() throws Exception {
+    try {
+      HCatEximOutputFormat.setOutput(
+          job,
+          dbName,
+          tblName,
+          outputLocation.toString(),
+          null,
+          null,
+          schema);
+
+      job.waitForCompletion(true);
+      HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
+      committer.cleanupJob(job);
+
+      Path metadataPath = new Path(outputLocation, "_metadata");
+      Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath);
+      Table table = rv.getKey();
+      List<Partition> partitions = rv.getValue();
+
+      assertEquals(dbName, table.getDbName());
+      assertEquals(tblName, table.getTableName());
+      assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
+          HCatUtil.getFieldSchemaList(columns)));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+          table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+          table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+      assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+          table.getSd().getInputFormat());
+      assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+          table.getSd().getOutputFormat());
+      assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+          table.getSd().getSerdeInfo().getSerializationLib());
+      assertEquals(0, table.getPartitionKeys().size());
+
+      assertEquals(0, partitions.size());
+    } catch (Exception e) {
+      System.out.println("Test failed with " + e.getMessage());
+      e.printStackTrace();
+      throw e;
+    }
+
+  }
+
+  public void testPart() throws Exception {
+    try {
+      List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>();
+      partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_country",
+          Constants.STRING_TYPE_NAME, "")));
+      partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_state",
+          Constants.STRING_TYPE_NAME, "")));
+      HCatSchema partitionSchema = new HCatSchema(partKeys);
+
+      List<String> partitionVals = new ArrayList<String>();
+      partitionVals.add("IN");
+      partitionVals.add("TN");
+
+      HCatEximOutputFormat.setOutput(
+          job,
+          dbName,
+          tblName,
+          outputLocation.toString(),
+          partitionSchema,
+          partitionVals,
+          schema);
+
+      job.waitForCompletion(true);
+      HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
+      committer.cleanupJob(job);
+      Path metadataPath = new Path(outputLocation, "_metadata");
+      Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath);
+      Table table = rv.getKey();
+      List<Partition> partitions = rv.getValue();
+
+      assertEquals(dbName, table.getDbName());
+      assertEquals(tblName, table.getTableName());
+      assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
+          HCatUtil.getFieldSchemaList(columns)));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+          table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+          table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+      assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+          table.getSd().getInputFormat());
+      assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+          table.getSd().getOutputFormat());
+      assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+          table.getSd().getSerdeInfo().getSerializationLib());
+      assertEquals(2, table.getPartitionKeys().size());
+      List<FieldSchema> partSchema = table.getPartitionKeys();
+      assertEquals("emp_country", partSchema.get(0).getName());
+      assertEquals("emp_state", partSchema.get(1).getName());
+
+      assertEquals(1, partitions.size());
+      Partition partition = partitions.get(0);
+      assertEquals("IN", partition.getValues().get(0));
+      assertEquals("TN", partition.getValues().get(1));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+          partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+          partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+    } catch (Exception e) {
+      System.out.println("Test failed with " + e.getMessage());
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    System.out.println("Setup started");
+    super.setUp();
+    conf = new Configuration();
+    job = new Job(conf, "test eximoutputformat");
+    columns = new ArrayList<HCatFieldSchema>();
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+        Constants.INT_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
+        Constants.STRING_TYPE_NAME, "")));
+    schema = new HCatSchema(columns);
+
+    fs = new LocalFileSystem();
+    fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+    outputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports");
+    if (fs.exists(outputLocation)) {
+      fs.delete(outputLocation, true);
+    }
+    dataLocation = new Path(fs.getWorkingDirectory(), "tmp/data");
+    if (fs.exists(dataLocation)) {
+      fs.delete(dataLocation, true);
+    }
+    FSDataOutputStream ds = fs.create(dataLocation, true);
+    ds.writeBytes("237,Krishna,01/01/1990,M,IN,TN\n");
+    ds.writeBytes("238,Kalpana,01/01/2000,F,IN,KA\n");
+    ds.writeBytes("239,Satya,01/01/2001,M,US,TN\n");
+    ds.writeBytes("240,Kavya,01/01/2002,F,US,KA\n");
+    ds.close();
+
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputFormatClass(HCatEximOutputFormat.class);
+    TextInputFormat.setInputPaths(job, dataLocation);
+    job.setJarByClass(this.getClass());
+    job.setMapperClass(TestMap.class);
+    job.setNumReduceTasks(0);
+    System.out.println("Setup done");
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    System.out.println("Teardown started");
+    super.tearDown();
+    fs.delete(dataLocation, true);
+    fs.delete(outputLocation, true);
+    System.out.println("Teardown done");
+  }
+}

Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHCatHiveCompatibility extends HCatBaseTest {
+    private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data";
+
+    @BeforeClass
+    public static void createInputData() throws Exception {
+        int LOOP_SIZE = 11;
+        File file = new File(INPUT_FILE_NAME);
+        file.deleteOnExit();
+        FileWriter writer = new FileWriter(file);
+        for (int i = 0; i < LOOP_SIZE; i++) {
+            writer.write(i + "\t1\n");
+        }
+        writer.close();
+    }
+
+    @Test
+    public void testUnpartedReadWrite() throws Exception {
+
+        driver.run("drop table if exists junit_unparted_noisd");
+        String createTable = "create table junit_unparted_noisd(a int) stored as RCFILE";
+        Assert.assertEquals(0, driver.run(createTable).getResponseCode());
+
+        // assert that the table created has no hcat instrumentation, and that we're still able to read it.
+        Table table = client.getTable("default", "junit_unparted_noisd");
+        Assert.assertTrue(table.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+        logAndRegister(server, "A = load '" + INPUT_FILE_NAME + "' as (a:int);");
+        logAndRegister(server, "store A into 'default.junit_unparted_noisd' using org.apache.hcatalog.pig.HCatStorer();");
+        logAndRegister(server, "B = load 'default.junit_unparted_noisd' using org.apache.hcatalog.pig.HCatLoader();");
+        Iterator<Tuple> itr = server.openIterator("B");
+
+        int i = 0;
+
+        while (itr.hasNext()) {
+            Tuple t = itr.next();
+            Assert.assertEquals(1, t.size());
+            Assert.assertEquals(t.get(0), i);
+            i++;
+        }
+
+        Assert.assertFalse(itr.hasNext());
+        Assert.assertEquals(11, i);
+
+        // assert that the table created still has no hcat instrumentation
+        Table table2 = client.getTable("default", "junit_unparted_noisd");
+        Assert.assertTrue(table2.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+        driver.run("drop table junit_unparted_noisd");
+    }
+
+    @Test
+    public void testPartedRead() throws Exception {
+
+        driver.run("drop table if exists junit_parted_noisd");
+        String createTable = "create table junit_parted_noisd(a int) partitioned by (b string) stored as RCFILE";
+        Assert.assertEquals(0, driver.run(createTable).getResponseCode());
+
+        // assert that the table created has no hcat instrumentation, and that we're still able to read it.
+        Table table = client.getTable("default", "junit_parted_noisd");
+        Assert.assertTrue(table.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+        logAndRegister(server, "A = load '" + INPUT_FILE_NAME + "' as (a:int);");
+        logAndRegister(server, "store A into 'default.junit_parted_noisd' using org.apache.hcatalog.pig.HCatStorer('b=42');");
+        logAndRegister(server, "B = load 'default.junit_parted_noisd' using org.apache.hcatalog.pig.HCatLoader();");
+        Iterator<Tuple> itr = server.openIterator("B");
+
+        int i = 0;
+
+        while (itr.hasNext()) {
+            Tuple t = itr.next();
+            Assert.assertEquals(2, t.size()); // Contains explicit field "a" and partition "b".
+            Assert.assertEquals(t.get(0), i);
+            Assert.assertEquals(t.get(1), "42");
+            i++;
+        }
+
+        Assert.assertFalse(itr.hasNext());
+        Assert.assertEquals(11, i);
+
+        // assert that the table created still has no hcat instrumentation
+        Table table2 = client.getTable("default", "junit_parted_noisd");
+        Assert.assertTrue(table2.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+        // assert that there is one partition present, and it had hcat instrumentation inserted when it was created.
+        Partition ptn = client.getPartition("default", "junit_parted_noisd", Arrays.asList("42"));
+
+        Assert.assertNotNull(ptn);
+        Assert.assertTrue(ptn.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+        driver.run("drop table junit_unparted_noisd");
+    }
+}

Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.thrift.test.IntString;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Iterator;
+
+public class TestHCatHiveThriftCompatibility extends HCatBaseTest {
+
+    private boolean setUpComplete = false;
+    private Path intStringSeq;
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        if (setUpComplete) {
+            return;
+        }
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        TIOStreamTransport transport = new TIOStreamTransport(out);
+        TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+        IntString intString = new IntString(1, "one", 1);
+        intString.write(protocol);
+        BytesWritable bytesWritable = new BytesWritable(out.toByteArray());
+
+        intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq");
+        LOG.info("Creating data file: " + intStringSeq);
+
+        SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(
+                intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq,
+                NullWritable.class, BytesWritable.class);
+        seqFileWriter.append(NullWritable.get(), bytesWritable);
+        seqFileWriter.close();
+
+        setUpComplete = true;
+    }
+
+    /**
+     *  Create a table with no explicit schema and ensure its correctly
+     *  discovered from the thrift struct.
+     */
+    @Test
+    public void testDynamicCols() throws Exception {
+        Assert.assertEquals(0, driver.run("drop table if exists test_thrift").getResponseCode());
+        Assert.assertEquals(0, driver.run(
+                "create external table test_thrift " +
+                        "partitioned by (year string) " +
+                        "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " +
+                        "with serdeproperties ( " +
+                        "  'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " +
+                        "  'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " +
+                        "stored as" +
+                        "  inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" +
+                        "  outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'")
+                .getResponseCode());
+        Assert.assertEquals(0,
+                driver.run("alter table test_thrift add partition (year = '2012') location '" +
+                        intStringSeq.getParent() + "'").getResponseCode());
+
+        PigServer pigServer = new PigServer(ExecType.LOCAL);
+        pigServer.registerQuery("A = load 'test_thrift' using org.apache.hcatalog.pig.HCatLoader();");
+
+        Schema expectedSchema = new Schema();
+        expectedSchema.add(new Schema.FieldSchema("myint", DataType.INTEGER));
+        expectedSchema.add(new Schema.FieldSchema("mystring", DataType.CHARARRAY));
+        expectedSchema.add(new Schema.FieldSchema("underscore_int", DataType.INTEGER));
+        expectedSchema.add(new Schema.FieldSchema("year", DataType.CHARARRAY));
+
+        Assert.assertEquals(expectedSchema, pigServer.dumpSchema("A"));
+
+        Iterator<Tuple> iterator = pigServer.openIterator("A");
+        Tuple t = iterator.next();
+        Assert.assertEquals(1, t.get(0));
+        Assert.assertEquals("one", t.get(1));
+        Assert.assertEquals(1, t.get(2));
+        Assert.assertEquals("2012", t.get(3));
+
+        Assert.assertFalse(iterator.hasNext());
+    }
+}

Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.thrift.test.IntString;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class TestHCatInputFormat extends HCatBaseTest {
+
+    private boolean setUpComplete = false;
+
+    /**
+     * Create an input sequence file with 100 records; every 10th record is bad.
+     * Load this table into Hive.
+     */
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        if (setUpComplete) {
+            return;
+        }
+
+        Path intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq");
+        LOG.info("Creating data file: " + intStringSeq);
+        SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(
+            intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq,
+            NullWritable.class, BytesWritable.class);
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        TIOStreamTransport transport = new TIOStreamTransport(out);
+        TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+        for (int i = 1; i <= 100; i++) {
+            if (i % 10 == 0) {
+                seqFileWriter.append(NullWritable.get(), new BytesWritable("bad record".getBytes()));
+            } else {
+                out.reset();
+                IntString intString = new IntString(i, Integer.toString(i), i);
+                intString.write(protocol);
+                BytesWritable bytesWritable = new BytesWritable(out.toByteArray());
+                seqFileWriter.append(NullWritable.get(), bytesWritable);
+            }
+        }
+
+        seqFileWriter.close();
+
+        // Now let's load this file into a new Hive table.
+        Assert.assertEquals(0, driver.run("drop table if exists test_bad_records").getResponseCode());
+        Assert.assertEquals(0, driver.run(
+            "create table test_bad_records " +
+                "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " +
+                "with serdeproperties ( " +
+                "  'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " +
+                "  'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " +
+                "stored as" +
+                "  inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" +
+                "  outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'")
+            .getResponseCode());
+        Assert.assertEquals(0, driver.run("load data local inpath '" + intStringSeq.getParent() +
+            "' into table test_bad_records").getResponseCode());
+
+        setUpComplete = true;
+    }
+
+    @Test
+    public void testBadRecordHandlingPasses() throws Exception {
+        Assert.assertTrue(runJob(0.1f));
+    }
+
+    @Test
+    public void testBadRecordHandlingFails() throws Exception {
+        Assert.assertFalse(runJob(0.01f));
+    }
+
+    private boolean runJob(float badRecordThreshold) throws Exception {
+        Configuration conf = new Configuration();
+
+        conf.setFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY, badRecordThreshold);
+
+        Job job = new Job(conf);
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MyMapper.class);
+
+        job.setInputFormatClass(HCatInputFormat.class);
+        job.setOutputFormatClass(TextOutputFormat.class);
+
+        HCatInputFormat.setInput(job, "default", "test_bad_records");
+
+        job.setMapOutputKeyClass(HCatRecord.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setNumReduceTasks(0);
+
+        Path path = new Path(TEST_DATA_DIR, "test_bad_record_handling_output");
+        if (path.getFileSystem(conf).exists(path)) {
+            path.getFileSystem(conf).delete(path, true);
+        }
+
+        TextOutputFormat.setOutputPath(job, path);
+
+        return job.waitForCompletion(true);
+    }
+
+    public static class MyMapper extends Mapper<NullWritable, HCatRecord, NullWritable, Text> {
+        @Override
+        public void map(NullWritable key, HCatRecord value, Context context)
+            throws IOException, InterruptedException {
+            LOG.info("HCatRecord: " + value);
+            context.write(NullWritable.get(), new Text(value.toString()));
+        }
+    }
+}

Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.MultiOutputFormat.JobConfigurer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHCatMultiOutputFormat {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestHCatMultiOutputFormat.class);
+
+    private static final String DATABASE = "default";
+    private static final String[] tableNames = {"test1", "test2", "test3"};
+    private static final String[] tablePerms = {"755", "750", "700"};
+    private static Path warehousedir = null;
+    private static HashMap<String, HCatSchema> schemaMap = new HashMap<String, HCatSchema>();
+    private static HiveMetaStoreClient hmsc;
+    private static MiniMRCluster mrCluster;
+    private static Configuration mrConf;
+    private static HiveConf hiveConf;
+    private static File workDir;
+
+    private static final String msPort = "20199";
+    private static Thread t;
+
+    static {
+        schemaMap.put(tableNames[0], new HCatSchema(ColumnHolder.hCattest1Cols));
+        schemaMap.put(tableNames[1], new HCatSchema(ColumnHolder.hCattest2Cols));
+        schemaMap.put(tableNames[2], new HCatSchema(ColumnHolder.hCattest3Cols));
+    }
+
+    private static class RunMS implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                String warehouseConf = HiveConf.ConfVars.METASTOREWAREHOUSE.varname + "="
+                    + warehousedir.toString();
+                HiveMetaStore.main(new String[]{"-v", "-p", msPort, "--hiveconf", warehouseConf});
+            } catch (Throwable t) {
+                System.err.println("Exiting. Got exception from metastore: " + t.getMessage());
+            }
+        }
+
+    }
+
+    /**
+     * Private class which holds all the data for the test cases
+     */
+    private static class ColumnHolder {
+
+        private static ArrayList<HCatFieldSchema> hCattest1Cols = new ArrayList<HCatFieldSchema>();
+        private static ArrayList<HCatFieldSchema> hCattest2Cols = new ArrayList<HCatFieldSchema>();
+        private static ArrayList<HCatFieldSchema> hCattest3Cols = new ArrayList<HCatFieldSchema>();
+
+        private static ArrayList<FieldSchema> partitionCols = new ArrayList<FieldSchema>();
+        private static ArrayList<FieldSchema> test1Cols = new ArrayList<FieldSchema>();
+        private static ArrayList<FieldSchema> test2Cols = new ArrayList<FieldSchema>();
+        private static ArrayList<FieldSchema> test3Cols = new ArrayList<FieldSchema>();
+
+        private static HashMap<String, List<FieldSchema>> colMapping = new HashMap<String, List<FieldSchema>>();
+
+        static {
+            try {
+                FieldSchema keyCol = new FieldSchema("key", serdeConstants.STRING_TYPE_NAME, "");
+                test1Cols.add(keyCol);
+                test2Cols.add(keyCol);
+                test3Cols.add(keyCol);
+                hCattest1Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol));
+                hCattest2Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol));
+                hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol));
+                FieldSchema valueCol = new FieldSchema("value", serdeConstants.STRING_TYPE_NAME, "");
+                test1Cols.add(valueCol);
+                test3Cols.add(valueCol);
+                hCattest1Cols.add(HCatSchemaUtils.getHCatFieldSchema(valueCol));
+                hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(valueCol));
+                FieldSchema extraCol = new FieldSchema("extra", serdeConstants.STRING_TYPE_NAME, "");
+                test3Cols.add(extraCol);
+                hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(extraCol));
+                colMapping.put("test1", test1Cols);
+                colMapping.put("test2", test2Cols);
+                colMapping.put("test3", test3Cols);
+            } catch (HCatException e) {
+                LOG.error("Error in setting up schema fields for the table", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        static {
+            partitionCols.add(new FieldSchema("ds", serdeConstants.STRING_TYPE_NAME, ""));
+            partitionCols.add(new FieldSchema("cluster", serdeConstants.STRING_TYPE_NAME, ""));
+        }
+    }
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        String testDir = System.getProperty("test.data.dir", "./");
+        testDir = testDir + "/test_multitable_" + Math.abs(new Random().nextLong()) + "/";
+        workDir = new File(new File(testDir).getCanonicalPath());
+        FileUtil.fullyDelete(workDir);
+        workDir.mkdirs();
+
+        warehousedir = new Path(workDir + "/warehouse");
+
+        // Run hive metastore server
+        t = new Thread(new RunMS());
+        t.start();
+
+        // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+        // to use MiniMRCluster. MAPREDUCE-2350
+        Configuration conf = new Configuration(true);
+        conf.set("yarn.scheduler.capacity.root.queues", "default");
+        conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
+
+        FileSystem fs = FileSystem.get(conf);
+        System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath());
+        mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+            new JobConf(conf));
+        mrConf = mrCluster.createJobConf();
+        fs.mkdirs(warehousedir);
+
+        initializeSetup();
+    }
+
+    private static void initializeSetup() throws Exception {
+
+        hiveConf = new HiveConf(mrConf, TestHCatMultiOutputFormat.class);
+        hiveConf.set("hive.metastore.local", "false");
+        hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort);
+        hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+        hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES, 3);
+        hiveConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+            HCatSemanticAnalyzer.class.getName());
+        hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+        System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+        System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+
+        hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousedir.toString());
+        try {
+            hmsc = new HiveMetaStoreClient(hiveConf, null);
+            initalizeTables();
+        } catch (Throwable e) {
+            LOG.error("Exception encountered while setting up testcase", e);
+            throw new Exception(e);
+        } finally {
+            hmsc.close();
+        }
+    }
+
+    private static void initalizeTables() throws Exception {
+        for (String table : tableNames) {
+            try {
+                if (hmsc.getTable(DATABASE, table) != null) {
+                    hmsc.dropTable(DATABASE, table);
+                }
+            } catch (NoSuchObjectException ignored) {
+            }
+        }
+        for (int i = 0; i < tableNames.length; i++) {
+            createTable(tableNames[i], tablePerms[i]);
+        }
+    }
+
+    private static void createTable(String tableName, String tablePerm) throws Exception {
+        Table tbl = new Table();
+        tbl.setDbName(DATABASE);
+        tbl.setTableName(tableName);
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setCols(ColumnHolder.colMapping.get(tableName));
+        tbl.setSd(sd);
+        sd.setParameters(new HashMap<String, String>());
+        sd.setSerdeInfo(new SerDeInfo());
+        sd.getSerdeInfo().setName(tbl.getTableName());
+        sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+        sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName());
+        sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName());
+        sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+        sd.getSerdeInfo().setSerializationLib(
+            org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName());
+        tbl.setPartitionKeys(ColumnHolder.partitionCols);
+
+        hmsc.createTable(tbl);
+        FileSystem fs = FileSystem.get(mrConf);
+        fs.setPermission(new Path(warehousedir, tableName), new FsPermission(tablePerm));
+    }
+
+    @AfterClass
+    public static void tearDown() throws IOException {
+        FileUtil.fullyDelete(workDir);
+        FileSystem fs = FileSystem.get(mrConf);
+        if (fs.exists(warehousedir)) {
+            fs.delete(warehousedir, true);
+        }
+        if (mrCluster != null) {
+            mrCluster.shutdown();
+        }
+    }
+
+    /**
+     * Simple test case.
+     * <ol>
+     * <li>Submits a mapred job which writes out one fixed line to each of the tables</li>
+     * <li>uses hive fetch task to read the data and see if it matches what was written</li>
+     * </ol>
+     *
+     * @throws Exception if any error occurs
+     */
+    @Test
+    public void testOutputFormat() throws Throwable {
+        HashMap<String, String> partitionValues = new HashMap<String, String>();
+        partitionValues.put("ds", "1");
+        partitionValues.put("cluster", "ag");
+        ArrayList<OutputJobInfo> infoList = new ArrayList<OutputJobInfo>();
+        infoList.add(OutputJobInfo.create("default", tableNames[0], partitionValues));
+        infoList.add(OutputJobInfo.create("default", tableNames[1], partitionValues));
+        infoList.add(OutputJobInfo.create("default", tableNames[2], partitionValues));
+
+        Job job = new Job(hiveConf, "SampleJob");
+
+        job.setMapperClass(MyMapper.class);
+        job.setInputFormatClass(TextInputFormat.class);
+        job.setOutputFormatClass(MultiOutputFormat.class);
+        job.setNumReduceTasks(0);
+
+        JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+
+        for (int i = 0; i < tableNames.length; i++) {
+            configurer.addOutputFormat(tableNames[i], HCatOutputFormat.class, BytesWritable.class,
+                HCatRecord.class);
+            HCatOutputFormat.setOutput(configurer.getJob(tableNames[i]), infoList.get(i));
+            HCatOutputFormat.setSchema(configurer.getJob(tableNames[i]),
+                schemaMap.get(tableNames[i]));
+        }
+        configurer.configure();
+
+        Path filePath = createInputFile();
+        FileInputFormat.addInputPath(job, filePath);
+        Assert.assertTrue(job.waitForCompletion(true));
+
+        ArrayList<String> outputs = new ArrayList<String>();
+        for (String tbl : tableNames) {
+            outputs.add(getTableData(tbl, "default").get(0));
+        }
+        Assert.assertEquals("Comparing output of table " +
+            tableNames[0] + " is not correct", outputs.get(0), "a,a,1,ag");
+        Assert.assertEquals("Comparing output of table " +
+            tableNames[1] + " is not correct", outputs.get(1), "a,1,ag");
+        Assert.assertEquals("Comparing output of table " +
+            tableNames[2] + " is not correct", outputs.get(2), "a,a,extra,1,ag");
+
+        // Check permisssion on partition dirs and files created
+        for (int i = 0; i < tableNames.length; i++) {
+            Path partitionFile = new Path(warehousedir + "/" + tableNames[i]
+                + "/ds=1/cluster=ag/part-m-00000");
+            FileSystem fs = partitionFile.getFileSystem(mrConf);
+            Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct",
+                fs.getFileStatus(partitionFile).getPermission(),
+                new FsPermission(tablePerms[i]));
+            Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct",
+                fs.getFileStatus(partitionFile.getParent()).getPermission(),
+                new FsPermission(tablePerms[i]));
+            Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct",
+                fs.getFileStatus(partitionFile.getParent().getParent()).getPermission(),
+                new FsPermission(tablePerms[i]));
+
+        }
+        LOG.info("File permissions verified");
+    }
+
+    /**
+     * Create a input file for map
+     *
+     * @return absolute path of the file.
+     * @throws IOException if any error encountered
+     */
+    private Path createInputFile() throws IOException {
+        Path f = new Path(workDir + "/MultiTableInput.txt");
+        FileSystem fs = FileSystem.get(mrConf);
+        if (fs.exists(f)) {
+            fs.delete(f, true);
+        }
+        OutputStream out = fs.create(f);
+        for (int i = 0; i < 3; i++) {
+            out.write("a,a\n".getBytes());
+        }
+        out.close();
+        return f;
+    }
+
+    /**
+     * Method to fetch table data
+     *
+     * @param table table name
+     * @param database database
+     * @return list of columns in comma seperated way
+     * @throws Exception if any error occurs
+     */
+    private List<String> getTableData(String table, String database) throws Exception {
+        HiveConf conf = new HiveConf();
+        conf.addResource("hive-site.xml");
+        ArrayList<String> results = new ArrayList<String>();
+        ArrayList<String> temp = new ArrayList<String>();
+        Hive hive = Hive.get(conf);
+        org.apache.hadoop.hive.ql.metadata.Table tbl = hive.getTable(database, table);
+        FetchWork work;
+        if (!tbl.getPartCols().isEmpty()) {
+            List<Partition> partitions = hive.getPartitions(tbl);
+            List<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
+            List<String> partLocs = new ArrayList<String>();
+            for (Partition part : partitions) {
+                partLocs.add(part.getLocation());
+                partDesc.add(Utilities.getPartitionDesc(part));
+            }
+            work = new FetchWork(partLocs, partDesc, Utilities.getTableDesc(tbl));
+            work.setLimit(100);
+        } else {
+            work = new FetchWork(tbl.getDataLocation().toString(), Utilities.getTableDesc(tbl));
+        }
+        FetchTask task = new FetchTask();
+        task.setWork(work);
+        task.initialize(conf, null, null);
+        task.fetch(temp);
+        for (String str : temp) {
+            results.add(str.replace("\t", ","));
+        }
+        return results;
+    }
+
+    private static class MyMapper extends
+        Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+        private int i = 0;
+
+        @Override
+        protected void map(LongWritable key, Text value, Context context)
+            throws IOException, InterruptedException {
+            HCatRecord record = null;
+            String[] splits = value.toString().split(",");
+            switch (i) {
+            case 0:
+                record = new DefaultHCatRecord(2);
+                record.set(0, splits[0]);
+                record.set(1, splits[1]);
+                break;
+            case 1:
+                record = new DefaultHCatRecord(1);
+                record.set(0, splits[0]);
+                break;
+            case 2:
+                record = new DefaultHCatRecord(3);
+                record.set(0, splits[0]);
+                record.set(1, splits[1]);
+                record.set(2, "extra");
+                break;
+            default:
+                Assert.fail("This should not happen!!!!!");
+            }
+            MultiOutputFormat.write(tableNames[i], null, record, context);
+            i++;
+        }
+    }
+}

Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHCatNonPartitioned extends HCatMapReduceTest {
+
+    private static List<HCatRecord> writeRecords;
+    static List<HCatFieldSchema> partitionColumns;
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+
+        dbName = null; //test if null dbName works ("default" is used)
+        tableName = "testHCatNonPartitionedTable";
+
+        writeRecords = new ArrayList<HCatRecord>();
+
+        for (int i = 0; i < 20; i++) {
+            List<Object> objList = new ArrayList<Object>();
+
+            objList.add(i);
+            objList.add("strvalue" + i);
+            writeRecords.add(new DefaultHCatRecord(objList));
+        }
+
+        partitionColumns = new ArrayList<HCatFieldSchema>();
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")));
+    }
+
+    @Override
+    protected List<FieldSchema> getPartitionKeys() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        //empty list, non partitioned
+        return fields;
+    }
+
+    @Override
+    protected List<FieldSchema> getTableColumns() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""));
+        fields.add(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+
+    @Test
+    public void testHCatNonPartitionedTable() throws Exception {
+
+        Map<String, String> partitionMap = new HashMap<String, String>();
+        runMRCreate(null, partitionColumns, writeRecords, 10, true);
+
+        //Test for duplicate publish
+        IOException exc = null;
+        try {
+            runMRCreate(null, partitionColumns, writeRecords, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType());
+
+        //Test for publish with invalid partition key name
+        exc = null;
+        partitionMap.clear();
+        partitionMap.put("px", "p1value2");
+
+        try {
+            runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+
+        //Read should get 10 rows
+        runMRRead(10);
+
+        hiveReadTest();
+    }
+
+    //Test that data inserted through hcatoutputformat is readable from hive
+    private void hiveReadTest() throws Exception {
+
+        String query = "select * from " + tableName;
+        int retCode = driver.run(query).getResponseCode();
+
+        if (retCode != 0) {
+            throw new Exception("Error " + retCode + " running query " + query);
+        }
+
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        assertEquals(10, res.size());
+    }
+}

Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHCatOutputFormat extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestHCatOutputFormat.class);
+    private HiveMetaStoreClient client;
+    private HiveConf hiveConf;
+
+    private static final String dbName = "hcatOutputFormatTestDB";
+    private static final String tblName = "hcatOutputFormatTestTable";
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        hiveConf = new HiveConf(this.getClass());
+
+        try {
+            client = new HiveMetaStoreClient(hiveConf, null);
+
+            initTable();
+        } catch (Throwable e) {
+            LOG.error("Unable to open the metastore", e);
+            throw new Exception(e);
+        }
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        try {
+            super.tearDown();
+            client.dropTable(dbName, tblName);
+            client.dropDatabase(dbName);
+
+            client.close();
+        } catch (Throwable e) {
+            LOG.error("Unable to close metastore", e);
+            throw new Exception(e);
+        }
+    }
+
+    private void initTable() throws Exception {
+
+        try {
+            client.dropTable(dbName, tblName);
+        } catch (Exception e) {
+        }
+        try {
+            client.dropDatabase(dbName);
+        } catch (Exception e) {
+        }
+        client.createDatabase(new Database(dbName, "", null, null));
+        assertNotNull((client.getDatabase(dbName).getLocationUri()));
+
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("colname", serdeConstants.STRING_TYPE_NAME, ""));
+
+        Table tbl = new Table();
+        tbl.setDbName(dbName);
+        tbl.setTableName(tblName);
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setCols(fields);
+        tbl.setSd(sd);
+
+        //sd.setLocation("hdfs://tmp");
+        sd.setInputFormat(RCFileInputFormat.class.getName());
+        sd.setOutputFormat(RCFileOutputFormat.class.getName());
+        sd.setParameters(new HashMap<String, String>());
+        sd.getParameters().put("test_param_1", "Use this for comments etc");
+        //sd.setBucketCols(new ArrayList<String>(2));
+        //sd.getBucketCols().add("name");
+        sd.setSerdeInfo(new SerDeInfo());
+        sd.getSerdeInfo().setName(tbl.getTableName());
+        sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+        sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+        sd.getSerdeInfo().setSerializationLib(
+                org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+        tbl.setPartitionKeys(fields);
+
+        Map<String, String> tableParams = new HashMap<String, String>();
+        tableParams.put("hcat.testarg", "testArgValue");
+
+        tbl.setParameters(tableParams);
+
+        client.createTable(tbl);
+        Path tblPath = new Path(client.getTable(dbName, tblName).getSd().getLocation());
+        assertTrue(tblPath.getFileSystem(hiveConf).mkdirs(new Path(tblPath, "colname=p1")));
+
+    }
+
+    public void testSetOutput() throws Exception {
+        Configuration conf = new Configuration();
+        Job job = new Job(conf, "test outputformat");
+
+        Map<String, String> partitionValues = new HashMap<String, String>();
+        partitionValues.put("colname", "p1");
+        //null server url means local mode
+        OutputJobInfo info = OutputJobInfo.create(dbName, tblName, partitionValues);
+
+        HCatOutputFormat.setOutput(job, info);
+        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(job);
+
+        assertNotNull(jobInfo.getTableInfo());
+        assertEquals(1, jobInfo.getPartitionValues().size());
+        assertEquals("p1", jobInfo.getPartitionValues().get("colname"));
+        assertEquals(1, jobInfo.getTableInfo().getDataColumns().getFields().size());
+        assertEquals("colname", jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName());
+
+        publishTest(job);
+    }
+
+    public void publishTest(Job job) throws Exception {
+        OutputCommitter committer = new FileOutputCommitterContainer(job, null);
+        committer.commitJob(job);
+
+        Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));
+        assertNotNull(part);
+
+        StorerInfo storer = InternalUtil.extractStorerInfo(part.getSd(), part.getParameters());
+        assertEquals(storer.getProperties().get("hcat.testarg"), "testArgValue");
+        assertTrue(part.getSd().getLocation().indexOf("p1") != -1);
+    }
+}

Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.NoExitSecurityManager;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHCatPartitionPublish {
+    private static Configuration mrConf = null;
+    private static FileSystem fs = null;
+    private static MiniMRCluster mrCluster = null;
+    private static boolean isServerRunning = false;
+    private static final int msPort = 20101;
+    private static HiveConf hcatConf;
+    private static HiveMetaStoreClient msc;
+    private static SecurityManager securityManager;
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        Configuration conf = new Configuration(true);
+        conf.set("yarn.scheduler.capacity.root.queues", "default");
+        conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
+        
+        fs = FileSystem.get(conf);
+        System.setProperty("hadoop.log.dir", new File(fs.getWorkingDirectory()
+                .toString(), "/logs").getAbsolutePath());
+        // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+        // to use MiniMRCluster. MAPREDUCE-2350
+        mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+                new JobConf(conf));
+        mrConf = mrCluster.createJobConf();
+
+        if (isServerRunning) {
+            return;
+        }
+
+        MetaStoreUtils.startMetaStore(msPort, ShimLoader
+                .getHadoopThriftAuthBridge());
+        isServerRunning = true;
+        securityManager = System.getSecurityManager();
+        System.setSecurityManager(new NoExitSecurityManager());
+
+        hcatConf = new HiveConf(TestHCatPartitionPublish.class);
+        hcatConf.set("hive.metastore.local", "false");
+        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
+                + msPort);
+        hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+        hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES, 3);
+        hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
+                "false");
+        msc = new HiveMetaStoreClient(hcatConf, null);
+        System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+        System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+    }
+
+    @AfterClass
+    public static void tearDown() throws IOException {
+        if (mrCluster != null) {
+            mrCluster.shutdown();
+        }
+        System.setSecurityManager(securityManager);
+        isServerRunning = false;
+    }
+
+    @Test
+    public void testPartitionPublish() throws Exception {
+        String dbName = "default";
+        String tableName = "testHCatPartitionedTable";
+        createTable(null, tableName);
+
+        Map<String, String> partitionMap = new HashMap<String, String>();
+        partitionMap.put("part1", "p1value1");
+        partitionMap.put("part0", "p0value1");
+
+        ArrayList<HCatFieldSchema> hcatTableColumns = new ArrayList<HCatFieldSchema>();
+        for (FieldSchema fs : getTableColumns()) {
+            hcatTableColumns.add(HCatSchemaUtils.getHCatFieldSchema(fs));
+        }
+
+        runMRCreateFail(dbName, tableName, partitionMap, hcatTableColumns);
+        List<String> ptns = msc.listPartitionNames(dbName, tableName,
+                (short) 10);
+        Assert.assertEquals(0, ptns.size());
+        Table table = msc.getTable(dbName, tableName);
+        Assert.assertTrue(table != null);
+        // Also make sure that the directory has been deleted in the table
+        // location.
+        Assert.assertFalse(fs.exists(new Path(table.getSd().getLocation()
+                + "/part1=p1value1/part0=p0value1")));
+    }
+
+    void runMRCreateFail(
+        String dbName, String tableName, Map<String, String> partitionValues,
+        List<HCatFieldSchema> columns) throws Exception {
+
+        Job job = new Job(mrConf, "hcat mapreduce write fail test");
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(TestHCatPartitionPublish.MapFail.class);
+
+        // input/output settings
+        job.setInputFormatClass(TextInputFormat.class);
+
+        Path path = new Path(fs.getWorkingDirectory(),
+                "mapred/testHCatMapReduceInput");
+        // The write count does not matter, as the map will fail in its first
+        // call.
+        createInputFile(path, 5);
+
+        TextInputFormat.setInputPaths(job, path);
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName,
+                partitionValues);
+        HCatOutputFormat.setOutput(job, outputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(DefaultHCatRecord.class);
+
+        job.setNumReduceTasks(0);
+
+        HCatOutputFormat.setSchema(job, new HCatSchema(columns));
+
+        boolean success = job.waitForCompletion(true);
+        Assert.assertTrue(success == false);
+    }
+
+    private void createInputFile(Path path, int rowCount) throws IOException {
+        if (fs.exists(path)) {
+            fs.delete(path, true);
+        }
+        FSDataOutputStream os = fs.create(path);
+        for (int i = 0; i < rowCount; i++) {
+            os.writeChars(i + "\n");
+        }
+        os.close();
+    }
+
+    public static class MapFail extends
+            Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+        @Override
+        public void map(LongWritable key, Text value, Context context)
+            throws IOException, InterruptedException {
+            {
+                throw new IOException("Exception to mimic job failure.");
+            }
+        }
+    }
+
+    private void createTable(String dbName, String tableName) throws Exception {
+        String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME
+                : dbName;
+        try {
+            msc.dropTable(databaseName, tableName);
+        } catch (Exception e) {
+        } // can fail with NoSuchObjectException
+
+        Table tbl = new Table();
+        tbl.setDbName(databaseName);
+        tbl.setTableName(tableName);
+        tbl.setTableType("MANAGED_TABLE");
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setCols(getTableColumns());
+        tbl.setPartitionKeys(getPartitionKeys());
+        tbl.setSd(sd);
+        sd.setBucketCols(new ArrayList<String>(2));
+        sd.setSerdeInfo(new SerDeInfo());
+        sd.getSerdeInfo().setName(tbl.getTableName());
+        sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+        sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+        sd.getSerdeInfo().setSerializationLib(ColumnarSerDe.class.getName());
+        sd.setInputFormat(RCFileInputFormat.class.getName());
+        sd.setOutputFormat(RCFileOutputFormat.class.getName());
+
+        Map<String, String> tableParams = new HashMap<String, String>();
+        tbl.setParameters(tableParams);
+
+        msc.createTable(tbl);
+    }
+
+    protected List<FieldSchema> getPartitionKeys() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        // Defining partition names in unsorted order
+        fields.add(new FieldSchema("PaRT1", serdeConstants.STRING_TYPE_NAME, ""));
+        fields.add(new FieldSchema("part0", serdeConstants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+    protected List<FieldSchema> getTableColumns() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""));
+        fields.add(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHCatPartitioned extends HCatMapReduceTest {
+
+    private static List<HCatRecord> writeRecords;
+    private static List<HCatFieldSchema> partitionColumns;
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+
+        tableName = "testHCatPartitionedTable";
+        writeRecords = new ArrayList<HCatRecord>();
+
+        for (int i = 0; i < 20; i++) {
+            List<Object> objList = new ArrayList<Object>();
+
+            objList.add(i);
+            objList.add("strvalue" + i);
+            writeRecords.add(new DefaultHCatRecord(objList));
+        }
+
+        partitionColumns = new ArrayList<HCatFieldSchema>();
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")));
+    }
+
+
+    @Override
+    protected List<FieldSchema> getPartitionKeys() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        //Defining partition names in unsorted order
+        fields.add(new FieldSchema("PaRT1", serdeConstants.STRING_TYPE_NAME, ""));
+        fields.add(new FieldSchema("part0", serdeConstants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+    @Override
+    protected List<FieldSchema> getTableColumns() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""));
+        fields.add(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+
+    @Test
+    public void testHCatPartitionedTable() throws Exception {
+
+        Map<String, String> partitionMap = new HashMap<String, String>();
+        partitionMap.put("part1", "p1value1");
+        partitionMap.put("part0", "p0value1");
+
+        runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+
+        partitionMap.clear();
+        partitionMap.put("PART1", "p1value2");
+        partitionMap.put("PART0", "p0value2");
+
+        runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+
+        //Test for duplicate publish
+        IOException exc = null;
+        try {
+            runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType());
+
+        //Test for publish with invalid partition key name
+        exc = null;
+        partitionMap.clear();
+        partitionMap.put("px1", "p1value2");
+        partitionMap.put("px0", "p0value2");
+
+        try {
+            runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType());
+
+        //Test for publish with missing partition key values
+        exc = null;
+        partitionMap.clear();
+        partitionMap.put("px", "p1value2");
+
+        try {
+            runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+
+
+        //Test for null partition value map
+        exc = null;
+        try {
+            runMRCreate(null, partitionColumns, writeRecords, 20, false);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc == null);
+//    assertTrue(exc instanceof HCatException);
+//    assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType());
+        // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values
+
+        //Read should get 10 + 20 rows
+        runMRRead(30);
+
+        //Read with partition filter
+        runMRRead(10, "part1 = \"p1value1\"");
+        runMRRead(20, "part1 = \"p1value2\"");
+        runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
+        runMRRead(10, "part0 = \"p0value1\"");
+        runMRRead(20, "part0 = \"p0value2\"");
+        runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\"");
+
+        tableSchemaTest();
+        columnOrderChangeTest();
+        hiveReadTest();
+    }
+
+
+    //test that new columns gets added to table schema
+    private void tableSchemaTest() throws Exception {
+
+        HCatSchema tableSchema = getTableSchema();
+
+        assertEquals(4, tableSchema.getFields().size());
+
+        //Update partition schema to have 3 fields
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", serdeConstants.STRING_TYPE_NAME, "")));
+
+        writeRecords = new ArrayList<HCatRecord>();
+
+        for (int i = 0; i < 20; i++) {
+            List<Object> objList = new ArrayList<Object>();
+
+            objList.add(i);
+            objList.add("strvalue" + i);
+            objList.add("str2value" + i);
+
+            writeRecords.add(new DefaultHCatRecord(objList));
+        }
+
+        Map<String, String> partitionMap = new HashMap<String, String>();
+        partitionMap.put("part1", "p1value5");
+        partitionMap.put("part0", "p0value5");
+
+        runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+
+        tableSchema = getTableSchema();
+
+        //assert that c3 has got added to table schema
+        assertEquals(5, tableSchema.getFields().size());
+        assertEquals("c1", tableSchema.getFields().get(0).getName());
+        assertEquals("c2", tableSchema.getFields().get(1).getName());
+        assertEquals("c3", tableSchema.getFields().get(2).getName());
+        assertEquals("part1", tableSchema.getFields().get(3).getName());
+        assertEquals("part0", tableSchema.getFields().get(4).getName());
+
+        //Test that changing column data type fails
+        partitionMap.clear();
+        partitionMap.put("part1", "p1value6");
+        partitionMap.put("part0", "p0value6");
+
+        partitionColumns = new ArrayList<HCatFieldSchema>();
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.INT_TYPE_NAME, "")));
+
+        IOException exc = null;
+        try {
+            runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType());
+
+        //Test that partition key is not allowed in data
+        partitionColumns = new ArrayList<HCatFieldSchema>();
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", serdeConstants.STRING_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("part1", serdeConstants.STRING_TYPE_NAME, "")));
+
+        List<HCatRecord> recordsContainingPartitionCols = new ArrayList<HCatRecord>(20);
+        for (int i = 0; i < 20; i++) {
+            List<Object> objList = new ArrayList<Object>();
+
+            objList.add(i);
+            objList.add("c2value" + i);
+            objList.add("c3value" + i);
+            objList.add("p1value6");
+
+            recordsContainingPartitionCols.add(new DefaultHCatRecord(objList));
+        }
+
+        exc = null;
+        try {
+            runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        List<HCatRecord> records = runMRRead(20, "part1 = \"p1value6\"");
+        assertEquals(20, records.size());
+        records = runMRRead(20, "part0 = \"p0value6\"");
+        assertEquals(20, records.size());
+        Integer i = 0;
+        for (HCatRecord rec : records) {
+            assertEquals(5, rec.size());
+            assertTrue(rec.get(0).equals(i));
+            assertTrue(rec.get(1).equals("c2value" + i));
+            assertTrue(rec.get(2).equals("c3value" + i));
+            assertTrue(rec.get(3).equals("p1value6"));
+            assertTrue(rec.get(4).equals("p0value6"));
+            i++;
+        }
+    }
+
+    //check behavior while change the order of columns
+    private void columnOrderChangeTest() throws Exception {
+
+        HCatSchema tableSchema = getTableSchema();
+
+        assertEquals(5, tableSchema.getFields().size());
+
+        partitionColumns = new ArrayList<HCatFieldSchema>();
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", serdeConstants.STRING_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")));
+
+
+        writeRecords = new ArrayList<HCatRecord>();
+
+        for (int i = 0; i < 10; i++) {
+            List<Object> objList = new ArrayList<Object>();
+
+            objList.add(i);
+            objList.add("co strvalue" + i);
+            objList.add("co str2value" + i);
+
+            writeRecords.add(new DefaultHCatRecord(objList));
+        }
+
+        Map<String, String> partitionMap = new HashMap<String, String>();
+        partitionMap.put("part1", "p1value8");
+        partitionMap.put("part0", "p0value8");
+
+        Exception exc = null;
+        try {
+            runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+        } catch (IOException e) {
+            exc = e;
+        }
+
+        assertTrue(exc != null);
+        assertTrue(exc instanceof HCatException);
+        assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType());
+
+
+        partitionColumns = new ArrayList<HCatFieldSchema>();
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+        partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")));
+
+        writeRecords = new ArrayList<HCatRecord>();
+
+        for (int i = 0; i < 10; i++) {
+            List<Object> objList = new ArrayList<Object>();
+
+            objList.add(i);
+            objList.add("co strvalue" + i);
+
+            writeRecords.add(new DefaultHCatRecord(objList));
+        }
+
+        runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+
+        //Read should get 10 + 20 + 10 + 10 + 20 rows
+        runMRRead(70);
+    }
+
+    //Test that data inserted through hcatoutputformat is readable from hive
+    private void hiveReadTest() throws Exception {
+
+        String query = "select * from " + tableName;
+        int retCode = driver.run(query).getResponseCode();
+
+        if (retCode != 0) {
+            throw new Exception("Error " + retCode + " running query " + query);
+        }
+
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        assertEquals(70, res.size());
+    }
+}

Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestInputJobInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestInputJobInfo.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestInputJobInfo.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestInputJobInfo.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.util.Properties;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+public class TestInputJobInfo extends HCatBaseTest {
+
+    @Test
+    public void test4ArgCreate() throws Exception {
+        Properties p = new Properties();
+        p.setProperty("key", "value");
+        InputJobInfo jobInfo = InputJobInfo.create("Db", "Table", "Filter", p);
+        Assert.assertEquals("Db", jobInfo.getDatabaseName());
+        Assert.assertEquals("Table", jobInfo.getTableName());
+        Assert.assertEquals("Filter", jobInfo.getFilter());
+        Assert.assertEquals("value", jobInfo.getProperties().getProperty("key"));
+    }
+
+    @Test
+    public void test3ArgCreate() throws Exception {
+        InputJobInfo jobInfo = InputJobInfo.create("Db", "Table", "Filter");
+        Assert.assertEquals("Db", jobInfo.getDatabaseName());
+        Assert.assertEquals("Table", jobInfo.getTableName());
+        Assert.assertEquals("Filter", jobInfo.getFilter());
+        Assert.assertEquals(0, jobInfo.getProperties().size());
+    }
+}