You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC
svn commit: r1520466 [11/18] - in /hive/trunk/hcatalog:
core/src/main/java/org/apache/hcatalog/cli/
core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/
core/src/main/java/org/apache/hcatalog/common/
core/src/main/java/org/apache/hcatalog/data/...
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java.broken
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java.broken?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java.broken (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java.broken Fri Sep 6 00:49:14 2013
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+
+/**
+ *
+ * TestHCatEximOutputFormat. Some basic testing here. More testing done via
+ * TestHCatEximInputFormat
+ *
+ */
+public class TestHCatEximOutputFormat extends TestCase {
+
+ public static class TestMap extends
+ Mapper<LongWritable, Text, LongWritable, HCatRecord> {
+
+ private HCatSchema recordSchema;
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
+ recordSchema = HCatEximOutputFormat.getTableSchema(context);
+ System.out.println("TestMap/setup called");
+ }
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ String[] cols = value.toString().split(",");
+ HCatRecord record = new DefaultHCatRecord(recordSchema.size());
+ System.out.println("TestMap/map called. Cols[0]:" + cols[0]);
+ System.out.println("TestMap/map called. Cols[1]:" + cols[1]);
+ System.out.println("TestMap/map called. Cols[2]:" + cols[2]);
+ System.out.println("TestMap/map called. Cols[3]:" + cols[3]);
+ record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0]));
+ record.setString("emp_name", recordSchema, cols[1]);
+ record.setString("emp_dob", recordSchema, cols[2]);
+ record.setString("emp_sex", recordSchema, cols[3]);
+ context.write(key, record);
+ }
+ }
+
+
+ private static final String dbName = "hcatEximOutputFormatTestDB";
+ private static final String tblName = "hcatEximOutputFormatTestTable";
+ Configuration conf;
+ Job job;
+ List<HCatFieldSchema> columns;
+ HCatSchema schema;
+ FileSystem fs;
+ Path outputLocation;
+ Path dataLocation;
+
+ public void testNonPart() throws Exception {
+ try {
+ HCatEximOutputFormat.setOutput(
+ job,
+ dbName,
+ tblName,
+ outputLocation.toString(),
+ null,
+ null,
+ schema);
+
+ job.waitForCompletion(true);
+ HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
+ committer.cleanupJob(job);
+
+ Path metadataPath = new Path(outputLocation, "_metadata");
+ Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath);
+ Table table = rv.getKey();
+ List<Partition> partitions = rv.getValue();
+
+ assertEquals(dbName, table.getDbName());
+ assertEquals(tblName, table.getTableName());
+ assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
+ HCatUtil.getFieldSchemaList(columns)));
+ assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+ table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+ assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+ table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+ assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+ table.getSd().getInputFormat());
+ assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+ table.getSd().getOutputFormat());
+ assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+ table.getSd().getSerdeInfo().getSerializationLib());
+ assertEquals(0, table.getPartitionKeys().size());
+
+ assertEquals(0, partitions.size());
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+
+ }
+
+ public void testPart() throws Exception {
+ try {
+ List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>();
+ partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_country",
+ Constants.STRING_TYPE_NAME, "")));
+ partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_state",
+ Constants.STRING_TYPE_NAME, "")));
+ HCatSchema partitionSchema = new HCatSchema(partKeys);
+
+ List<String> partitionVals = new ArrayList<String>();
+ partitionVals.add("IN");
+ partitionVals.add("TN");
+
+ HCatEximOutputFormat.setOutput(
+ job,
+ dbName,
+ tblName,
+ outputLocation.toString(),
+ partitionSchema,
+ partitionVals,
+ schema);
+
+ job.waitForCompletion(true);
+ HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
+ committer.cleanupJob(job);
+ Path metadataPath = new Path(outputLocation, "_metadata");
+ Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath);
+ Table table = rv.getKey();
+ List<Partition> partitions = rv.getValue();
+
+ assertEquals(dbName, table.getDbName());
+ assertEquals(tblName, table.getTableName());
+ assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
+ HCatUtil.getFieldSchemaList(columns)));
+ assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+ table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+ assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+ table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+ assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+ table.getSd().getInputFormat());
+ assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+ table.getSd().getOutputFormat());
+ assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+ table.getSd().getSerdeInfo().getSerializationLib());
+ assertEquals(2, table.getPartitionKeys().size());
+ List<FieldSchema> partSchema = table.getPartitionKeys();
+ assertEquals("emp_country", partSchema.get(0).getName());
+ assertEquals("emp_state", partSchema.get(1).getName());
+
+ assertEquals(1, partitions.size());
+ Partition partition = partitions.get(0);
+ assertEquals("IN", partition.getValues().get(0));
+ assertEquals("TN", partition.getValues().get(1));
+ assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+ partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+ assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+ partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ System.out.println("Setup started");
+ super.setUp();
+ conf = new Configuration();
+ job = new Job(conf, "test eximoutputformat");
+ columns = new ArrayList<HCatFieldSchema>();
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+ Constants.INT_TYPE_NAME, "")));
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+ Constants.STRING_TYPE_NAME, "")));
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
+ Constants.STRING_TYPE_NAME, "")));
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
+ Constants.STRING_TYPE_NAME, "")));
+ schema = new HCatSchema(columns);
+
+ fs = new LocalFileSystem();
+ fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+ outputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports");
+ if (fs.exists(outputLocation)) {
+ fs.delete(outputLocation, true);
+ }
+ dataLocation = new Path(fs.getWorkingDirectory(), "tmp/data");
+ if (fs.exists(dataLocation)) {
+ fs.delete(dataLocation, true);
+ }
+ FSDataOutputStream ds = fs.create(dataLocation, true);
+ ds.writeBytes("237,Krishna,01/01/1990,M,IN,TN\n");
+ ds.writeBytes("238,Kalpana,01/01/2000,F,IN,KA\n");
+ ds.writeBytes("239,Satya,01/01/2001,M,US,TN\n");
+ ds.writeBytes("240,Kavya,01/01/2002,F,US,KA\n");
+ ds.close();
+
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(HCatEximOutputFormat.class);
+ TextInputFormat.setInputPaths(job, dataLocation);
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(TestMap.class);
+ job.setNumReduceTasks(0);
+ System.out.println("Setup done");
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ System.out.println("Teardown started");
+ super.tearDown();
+ fs.delete(dataLocation, true);
+ fs.delete(outputLocation, true);
+ System.out.println("Teardown done");
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHCatHiveCompatibility extends HCatBaseTest {
+ private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data";
+
+ @BeforeClass
+ public static void createInputData() throws Exception {
+ int LOOP_SIZE = 11;
+ File file = new File(INPUT_FILE_NAME);
+ file.deleteOnExit();
+ FileWriter writer = new FileWriter(file);
+ for (int i = 0; i < LOOP_SIZE; i++) {
+ writer.write(i + "\t1\n");
+ }
+ writer.close();
+ }
+
+ @Test
+ public void testUnpartedReadWrite() throws Exception {
+
+ driver.run("drop table if exists junit_unparted_noisd");
+ String createTable = "create table junit_unparted_noisd(a int) stored as RCFILE";
+ Assert.assertEquals(0, driver.run(createTable).getResponseCode());
+
+ // assert that the table created has no hcat instrumentation, and that we're still able to read it.
+ Table table = client.getTable("default", "junit_unparted_noisd");
+ Assert.assertTrue(table.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ logAndRegister(server, "A = load '" + INPUT_FILE_NAME + "' as (a:int);");
+ logAndRegister(server, "store A into 'default.junit_unparted_noisd' using org.apache.hcatalog.pig.HCatStorer();");
+ logAndRegister(server, "B = load 'default.junit_unparted_noisd' using org.apache.hcatalog.pig.HCatLoader();");
+ Iterator<Tuple> itr = server.openIterator("B");
+
+ int i = 0;
+
+ while (itr.hasNext()) {
+ Tuple t = itr.next();
+ Assert.assertEquals(1, t.size());
+ Assert.assertEquals(t.get(0), i);
+ i++;
+ }
+
+ Assert.assertFalse(itr.hasNext());
+ Assert.assertEquals(11, i);
+
+ // assert that the table created still has no hcat instrumentation
+ Table table2 = client.getTable("default", "junit_unparted_noisd");
+ Assert.assertTrue(table2.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+ driver.run("drop table junit_unparted_noisd");
+ }
+
+ @Test
+ public void testPartedRead() throws Exception {
+
+ driver.run("drop table if exists junit_parted_noisd");
+ String createTable = "create table junit_parted_noisd(a int) partitioned by (b string) stored as RCFILE";
+ Assert.assertEquals(0, driver.run(createTable).getResponseCode());
+
+ // assert that the table created has no hcat instrumentation, and that we're still able to read it.
+ Table table = client.getTable("default", "junit_parted_noisd");
+ Assert.assertTrue(table.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ logAndRegister(server, "A = load '" + INPUT_FILE_NAME + "' as (a:int);");
+ logAndRegister(server, "store A into 'default.junit_parted_noisd' using org.apache.hcatalog.pig.HCatStorer('b=42');");
+ logAndRegister(server, "B = load 'default.junit_parted_noisd' using org.apache.hcatalog.pig.HCatLoader();");
+ Iterator<Tuple> itr = server.openIterator("B");
+
+ int i = 0;
+
+ while (itr.hasNext()) {
+ Tuple t = itr.next();
+ Assert.assertEquals(2, t.size()); // Contains explicit field "a" and partition "b".
+ Assert.assertEquals(t.get(0), i);
+ Assert.assertEquals(t.get(1), "42");
+ i++;
+ }
+
+ Assert.assertFalse(itr.hasNext());
+ Assert.assertEquals(11, i);
+
+ // assert that the table created still has no hcat instrumentation
+ Table table2 = client.getTable("default", "junit_parted_noisd");
+ Assert.assertTrue(table2.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+ // assert that there is one partition present, and it had hcat instrumentation inserted when it was created.
+ Partition ptn = client.getPartition("default", "junit_parted_noisd", Arrays.asList("42"));
+
+ Assert.assertNotNull(ptn);
+ Assert.assertTrue(ptn.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+ driver.run("drop table junit_unparted_noisd");
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.thrift.test.IntString;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Iterator;
+
+public class TestHCatHiveThriftCompatibility extends HCatBaseTest {
+
+ private boolean setUpComplete = false;
+ private Path intStringSeq;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ if (setUpComplete) {
+ return;
+ }
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ TIOStreamTransport transport = new TIOStreamTransport(out);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+ IntString intString = new IntString(1, "one", 1);
+ intString.write(protocol);
+ BytesWritable bytesWritable = new BytesWritable(out.toByteArray());
+
+ intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq");
+ LOG.info("Creating data file: " + intStringSeq);
+
+ SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(
+ intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq,
+ NullWritable.class, BytesWritable.class);
+ seqFileWriter.append(NullWritable.get(), bytesWritable);
+ seqFileWriter.close();
+
+ setUpComplete = true;
+ }
+
+ /**
+ * Create a table with no explicit schema and ensure its correctly
+ * discovered from the thrift struct.
+ */
+ @Test
+ public void testDynamicCols() throws Exception {
+ Assert.assertEquals(0, driver.run("drop table if exists test_thrift").getResponseCode());
+ Assert.assertEquals(0, driver.run(
+ "create external table test_thrift " +
+ "partitioned by (year string) " +
+ "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " +
+ "with serdeproperties ( " +
+ " 'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " +
+ " 'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " +
+ "stored as" +
+ " inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" +
+ " outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'")
+ .getResponseCode());
+ Assert.assertEquals(0,
+ driver.run("alter table test_thrift add partition (year = '2012') location '" +
+ intStringSeq.getParent() + "'").getResponseCode());
+
+ PigServer pigServer = new PigServer(ExecType.LOCAL);
+ pigServer.registerQuery("A = load 'test_thrift' using org.apache.hcatalog.pig.HCatLoader();");
+
+ Schema expectedSchema = new Schema();
+ expectedSchema.add(new Schema.FieldSchema("myint", DataType.INTEGER));
+ expectedSchema.add(new Schema.FieldSchema("mystring", DataType.CHARARRAY));
+ expectedSchema.add(new Schema.FieldSchema("underscore_int", DataType.INTEGER));
+ expectedSchema.add(new Schema.FieldSchema("year", DataType.CHARARRAY));
+
+ Assert.assertEquals(expectedSchema, pigServer.dumpSchema("A"));
+
+ Iterator<Tuple> iterator = pigServer.openIterator("A");
+ Tuple t = iterator.next();
+ Assert.assertEquals(1, t.get(0));
+ Assert.assertEquals("one", t.get(1));
+ Assert.assertEquals(1, t.get(2));
+ Assert.assertEquals("2012", t.get(3));
+
+ Assert.assertFalse(iterator.hasNext());
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.thrift.test.IntString;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class TestHCatInputFormat extends HCatBaseTest {
+
+ private boolean setUpComplete = false;
+
+ /**
+ * Create an input sequence file with 100 records; every 10th record is bad.
+ * Load this table into Hive.
+ */
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ if (setUpComplete) {
+ return;
+ }
+
+ Path intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq");
+ LOG.info("Creating data file: " + intStringSeq);
+ SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(
+ intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq,
+ NullWritable.class, BytesWritable.class);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ TIOStreamTransport transport = new TIOStreamTransport(out);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+ for (int i = 1; i <= 100; i++) {
+ if (i % 10 == 0) {
+ seqFileWriter.append(NullWritable.get(), new BytesWritable("bad record".getBytes()));
+ } else {
+ out.reset();
+ IntString intString = new IntString(i, Integer.toString(i), i);
+ intString.write(protocol);
+ BytesWritable bytesWritable = new BytesWritable(out.toByteArray());
+ seqFileWriter.append(NullWritable.get(), bytesWritable);
+ }
+ }
+
+ seqFileWriter.close();
+
+ // Now let's load this file into a new Hive table.
+ Assert.assertEquals(0, driver.run("drop table if exists test_bad_records").getResponseCode());
+ Assert.assertEquals(0, driver.run(
+ "create table test_bad_records " +
+ "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " +
+ "with serdeproperties ( " +
+ " 'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " +
+ " 'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " +
+ "stored as" +
+ " inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" +
+ " outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'")
+ .getResponseCode());
+ Assert.assertEquals(0, driver.run("load data local inpath '" + intStringSeq.getParent() +
+ "' into table test_bad_records").getResponseCode());
+
+ setUpComplete = true;
+ }
+
+ @Test
+ public void testBadRecordHandlingPasses() throws Exception {
+ Assert.assertTrue(runJob(0.1f));
+ }
+
+ @Test
+ public void testBadRecordHandlingFails() throws Exception {
+ Assert.assertFalse(runJob(0.01f));
+ }
+
+ private boolean runJob(float badRecordThreshold) throws Exception {
+ Configuration conf = new Configuration();
+
+ conf.setFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY, badRecordThreshold);
+
+ Job job = new Job(conf);
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(MyMapper.class);
+
+ job.setInputFormatClass(HCatInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ HCatInputFormat.setInput(job, "default", "test_bad_records");
+
+ job.setMapOutputKeyClass(HCatRecord.class);
+ job.setMapOutputValueClass(HCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ Path path = new Path(TEST_DATA_DIR, "test_bad_record_handling_output");
+ if (path.getFileSystem(conf).exists(path)) {
+ path.getFileSystem(conf).delete(path, true);
+ }
+
+ TextOutputFormat.setOutputPath(job, path);
+
+ return job.waitForCompletion(true);
+ }
+
+ public static class MyMapper extends Mapper<NullWritable, HCatRecord, NullWritable, Text> {
+ @Override
+ public void map(NullWritable key, HCatRecord value, Context context)
+ throws IOException, InterruptedException {
+ LOG.info("HCatRecord: " + value);
+ context.write(NullWritable.get(), new Text(value.toString()));
+ }
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.MultiOutputFormat.JobConfigurer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHCatMultiOutputFormat {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestHCatMultiOutputFormat.class);
+
+ private static final String DATABASE = "default";
+ private static final String[] tableNames = {"test1", "test2", "test3"};
+ private static final String[] tablePerms = {"755", "750", "700"};
+ private static Path warehousedir = null;
+ private static HashMap<String, HCatSchema> schemaMap = new HashMap<String, HCatSchema>();
+ private static HiveMetaStoreClient hmsc;
+ private static MiniMRCluster mrCluster;
+ private static Configuration mrConf;
+ private static HiveConf hiveConf;
+ private static File workDir;
+
+ private static final String msPort = "20199";
+ private static Thread t;
+
+ static {
+ schemaMap.put(tableNames[0], new HCatSchema(ColumnHolder.hCattest1Cols));
+ schemaMap.put(tableNames[1], new HCatSchema(ColumnHolder.hCattest2Cols));
+ schemaMap.put(tableNames[2], new HCatSchema(ColumnHolder.hCattest3Cols));
+ }
+
+ private static class RunMS implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ String warehouseConf = HiveConf.ConfVars.METASTOREWAREHOUSE.varname + "="
+ + warehousedir.toString();
+ HiveMetaStore.main(new String[]{"-v", "-p", msPort, "--hiveconf", warehouseConf});
+ } catch (Throwable t) {
+ System.err.println("Exiting. Got exception from metastore: " + t.getMessage());
+ }
+ }
+
+ }
+
+ /**
+ * Private class which holds all the data for the test cases
+ */
+ private static class ColumnHolder {
+
+ private static ArrayList<HCatFieldSchema> hCattest1Cols = new ArrayList<HCatFieldSchema>();
+ private static ArrayList<HCatFieldSchema> hCattest2Cols = new ArrayList<HCatFieldSchema>();
+ private static ArrayList<HCatFieldSchema> hCattest3Cols = new ArrayList<HCatFieldSchema>();
+
+ private static ArrayList<FieldSchema> partitionCols = new ArrayList<FieldSchema>();
+ private static ArrayList<FieldSchema> test1Cols = new ArrayList<FieldSchema>();
+ private static ArrayList<FieldSchema> test2Cols = new ArrayList<FieldSchema>();
+ private static ArrayList<FieldSchema> test3Cols = new ArrayList<FieldSchema>();
+
+ private static HashMap<String, List<FieldSchema>> colMapping = new HashMap<String, List<FieldSchema>>();
+
+ static {
+ try {
+ FieldSchema keyCol = new FieldSchema("key", serdeConstants.STRING_TYPE_NAME, "");
+ test1Cols.add(keyCol);
+ test2Cols.add(keyCol);
+ test3Cols.add(keyCol);
+ hCattest1Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol));
+ hCattest2Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol));
+ hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol));
+ FieldSchema valueCol = new FieldSchema("value", serdeConstants.STRING_TYPE_NAME, "");
+ test1Cols.add(valueCol);
+ test3Cols.add(valueCol);
+ hCattest1Cols.add(HCatSchemaUtils.getHCatFieldSchema(valueCol));
+ hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(valueCol));
+ FieldSchema extraCol = new FieldSchema("extra", serdeConstants.STRING_TYPE_NAME, "");
+ test3Cols.add(extraCol);
+ hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(extraCol));
+ colMapping.put("test1", test1Cols);
+ colMapping.put("test2", test2Cols);
+ colMapping.put("test3", test3Cols);
+ } catch (HCatException e) {
+ LOG.error("Error in setting up schema fields for the table", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ static {
+ partitionCols.add(new FieldSchema("ds", serdeConstants.STRING_TYPE_NAME, ""));
+ partitionCols.add(new FieldSchema("cluster", serdeConstants.STRING_TYPE_NAME, ""));
+ }
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ String testDir = System.getProperty("test.data.dir", "./");
+ testDir = testDir + "/test_multitable_" + Math.abs(new Random().nextLong()) + "/";
+ workDir = new File(new File(testDir).getCanonicalPath());
+ FileUtil.fullyDelete(workDir);
+ workDir.mkdirs();
+
+ warehousedir = new Path(workDir + "/warehouse");
+
+ // Run hive metastore server
+ t = new Thread(new RunMS());
+ t.start();
+
+ // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+ // to use MiniMRCluster. MAPREDUCE-2350
+ Configuration conf = new Configuration(true);
+ conf.set("yarn.scheduler.capacity.root.queues", "default");
+ conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
+
+ FileSystem fs = FileSystem.get(conf);
+ System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath());
+ mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+ new JobConf(conf));
+ mrConf = mrCluster.createJobConf();
+ fs.mkdirs(warehousedir);
+
+ initializeSetup();
+ }
+
+ private static void initializeSetup() throws Exception {
+
+ hiveConf = new HiveConf(mrConf, TestHCatMultiOutputFormat.class);
+ hiveConf.set("hive.metastore.local", "false");
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort);
+ hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+ hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES, 3);
+ hiveConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+ System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousedir.toString());
+ try {
+ hmsc = new HiveMetaStoreClient(hiveConf, null);
+ initalizeTables();
+ } catch (Throwable e) {
+ LOG.error("Exception encountered while setting up testcase", e);
+ throw new Exception(e);
+ } finally {
+ hmsc.close();
+ }
+ }
+
+ private static void initalizeTables() throws Exception {
+ for (String table : tableNames) {
+ try {
+ if (hmsc.getTable(DATABASE, table) != null) {
+ hmsc.dropTable(DATABASE, table);
+ }
+ } catch (NoSuchObjectException ignored) {
+ }
+ }
+ for (int i = 0; i < tableNames.length; i++) {
+ createTable(tableNames[i], tablePerms[i]);
+ }
+ }
+
+ private static void createTable(String tableName, String tablePerm) throws Exception {
+ Table tbl = new Table();
+ tbl.setDbName(DATABASE);
+ tbl.setTableName(tableName);
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(ColumnHolder.colMapping.get(tableName));
+ tbl.setSd(sd);
+ sd.setParameters(new HashMap<String, String>());
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName());
+ sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName());
+ sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().setSerializationLib(
+ org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName());
+ tbl.setPartitionKeys(ColumnHolder.partitionCols);
+
+ hmsc.createTable(tbl);
+ FileSystem fs = FileSystem.get(mrConf);
+ fs.setPermission(new Path(warehousedir, tableName), new FsPermission(tablePerm));
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ FileUtil.fullyDelete(workDir);
+ FileSystem fs = FileSystem.get(mrConf);
+ if (fs.exists(warehousedir)) {
+ fs.delete(warehousedir, true);
+ }
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+ }
+
+ /**
+ * Simple test case.
+ * <ol>
+ * <li>Submits a mapred job which writes out one fixed line to each of the tables</li>
+ * <li>uses hive fetch task to read the data and see if it matches what was written</li>
+ * </ol>
+ *
+ * @throws Exception if any error occurs
+ */
+ @Test
+ public void testOutputFormat() throws Throwable {
+ HashMap<String, String> partitionValues = new HashMap<String, String>();
+ partitionValues.put("ds", "1");
+ partitionValues.put("cluster", "ag");
+ ArrayList<OutputJobInfo> infoList = new ArrayList<OutputJobInfo>();
+ infoList.add(OutputJobInfo.create("default", tableNames[0], partitionValues));
+ infoList.add(OutputJobInfo.create("default", tableNames[1], partitionValues));
+ infoList.add(OutputJobInfo.create("default", tableNames[2], partitionValues));
+
+ Job job = new Job(hiveConf, "SampleJob");
+
+ job.setMapperClass(MyMapper.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(MultiOutputFormat.class);
+ job.setNumReduceTasks(0);
+
+ JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+
+ for (int i = 0; i < tableNames.length; i++) {
+ configurer.addOutputFormat(tableNames[i], HCatOutputFormat.class, BytesWritable.class,
+ HCatRecord.class);
+ HCatOutputFormat.setOutput(configurer.getJob(tableNames[i]), infoList.get(i));
+ HCatOutputFormat.setSchema(configurer.getJob(tableNames[i]),
+ schemaMap.get(tableNames[i]));
+ }
+ configurer.configure();
+
+ Path filePath = createInputFile();
+ FileInputFormat.addInputPath(job, filePath);
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ ArrayList<String> outputs = new ArrayList<String>();
+ for (String tbl : tableNames) {
+ outputs.add(getTableData(tbl, "default").get(0));
+ }
+ Assert.assertEquals("Comparing output of table " +
+ tableNames[0] + " is not correct", outputs.get(0), "a,a,1,ag");
+ Assert.assertEquals("Comparing output of table " +
+ tableNames[1] + " is not correct", outputs.get(1), "a,1,ag");
+ Assert.assertEquals("Comparing output of table " +
+ tableNames[2] + " is not correct", outputs.get(2), "a,a,extra,1,ag");
+
+ // Check permisssion on partition dirs and files created
+ for (int i = 0; i < tableNames.length; i++) {
+ Path partitionFile = new Path(warehousedir + "/" + tableNames[i]
+ + "/ds=1/cluster=ag/part-m-00000");
+ FileSystem fs = partitionFile.getFileSystem(mrConf);
+ Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct",
+ fs.getFileStatus(partitionFile).getPermission(),
+ new FsPermission(tablePerms[i]));
+ Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct",
+ fs.getFileStatus(partitionFile.getParent()).getPermission(),
+ new FsPermission(tablePerms[i]));
+ Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct",
+ fs.getFileStatus(partitionFile.getParent().getParent()).getPermission(),
+ new FsPermission(tablePerms[i]));
+
+ }
+ LOG.info("File permissions verified");
+ }
+
+ /**
+ * Create a input file for map
+ *
+ * @return absolute path of the file.
+ * @throws IOException if any error encountered
+ */
+ private Path createInputFile() throws IOException {
+ Path f = new Path(workDir + "/MultiTableInput.txt");
+ FileSystem fs = FileSystem.get(mrConf);
+ if (fs.exists(f)) {
+ fs.delete(f, true);
+ }
+ OutputStream out = fs.create(f);
+ for (int i = 0; i < 3; i++) {
+ out.write("a,a\n".getBytes());
+ }
+ out.close();
+ return f;
+ }
+
+ /**
+ * Method to fetch table data
+ *
+ * @param table table name
+ * @param database database
+ * @return list of columns in comma seperated way
+ * @throws Exception if any error occurs
+ */
+ private List<String> getTableData(String table, String database) throws Exception {
+ HiveConf conf = new HiveConf();
+ conf.addResource("hive-site.xml");
+ ArrayList<String> results = new ArrayList<String>();
+ ArrayList<String> temp = new ArrayList<String>();
+ Hive hive = Hive.get(conf);
+ org.apache.hadoop.hive.ql.metadata.Table tbl = hive.getTable(database, table);
+ FetchWork work;
+ if (!tbl.getPartCols().isEmpty()) {
+ List<Partition> partitions = hive.getPartitions(tbl);
+ List<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
+ List<String> partLocs = new ArrayList<String>();
+ for (Partition part : partitions) {
+ partLocs.add(part.getLocation());
+ partDesc.add(Utilities.getPartitionDesc(part));
+ }
+ work = new FetchWork(partLocs, partDesc, Utilities.getTableDesc(tbl));
+ work.setLimit(100);
+ } else {
+ work = new FetchWork(tbl.getDataLocation().toString(), Utilities.getTableDesc(tbl));
+ }
+ FetchTask task = new FetchTask();
+ task.setWork(work);
+ task.initialize(conf, null, null);
+ task.fetch(temp);
+ for (String str : temp) {
+ results.add(str.replace("\t", ","));
+ }
+ return results;
+ }
+
+ private static class MyMapper extends
+ Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+ private int i = 0;
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ HCatRecord record = null;
+ String[] splits = value.toString().split(",");
+ switch (i) {
+ case 0:
+ record = new DefaultHCatRecord(2);
+ record.set(0, splits[0]);
+ record.set(1, splits[1]);
+ break;
+ case 1:
+ record = new DefaultHCatRecord(1);
+ record.set(0, splits[0]);
+ break;
+ case 2:
+ record = new DefaultHCatRecord(3);
+ record.set(0, splits[0]);
+ record.set(1, splits[1]);
+ record.set(2, "extra");
+ break;
+ default:
+ Assert.fail("This should not happen!!!!!");
+ }
+ MultiOutputFormat.write(tableNames[i], null, record, context);
+ i++;
+ }
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHCatNonPartitioned extends HCatMapReduceTest {
+
+ private static List<HCatRecord> writeRecords;
+ static List<HCatFieldSchema> partitionColumns;
+
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+
+ dbName = null; //test if null dbName works ("default" is used)
+ tableName = "testHCatNonPartitionedTable";
+
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for (int i = 0; i < 20; i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("strvalue" + i);
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")));
+ }
+
+ @Override
+ protected List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ //empty list, non partitioned
+ return fields;
+ }
+
+ @Override
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+
+ @Test
+ public void testHCatNonPartitionedTable() throws Exception {
+
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ runMRCreate(null, partitionColumns, writeRecords, 10, true);
+
+ //Test for duplicate publish
+ IOException exc = null;
+ try {
+ runMRCreate(null, partitionColumns, writeRecords, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType());
+
+ //Test for publish with invalid partition key name
+ exc = null;
+ partitionMap.clear();
+ partitionMap.put("px", "p1value2");
+
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+
+ //Read should get 10 rows
+ runMRRead(10);
+
+ hiveReadTest();
+ }
+
+ //Test that data inserted through hcatoutputformat is readable from hive
+ private void hiveReadTest() throws Exception {
+
+ String query = "select * from " + tableName;
+ int retCode = driver.run(query).getResponseCode();
+
+ if (retCode != 0) {
+ throw new Exception("Error " + retCode + " running query " + query);
+ }
+
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ assertEquals(10, res.size());
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHCatOutputFormat extends TestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestHCatOutputFormat.class);
+ private HiveMetaStoreClient client;
+ private HiveConf hiveConf;
+
+ private static final String dbName = "hcatOutputFormatTestDB";
+ private static final String tblName = "hcatOutputFormatTestTable";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ hiveConf = new HiveConf(this.getClass());
+
+ try {
+ client = new HiveMetaStoreClient(hiveConf, null);
+
+ initTable();
+ } catch (Throwable e) {
+ LOG.error("Unable to open the metastore", e);
+ throw new Exception(e);
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ try {
+ super.tearDown();
+ client.dropTable(dbName, tblName);
+ client.dropDatabase(dbName);
+
+ client.close();
+ } catch (Throwable e) {
+ LOG.error("Unable to close metastore", e);
+ throw new Exception(e);
+ }
+ }
+
+ private void initTable() throws Exception {
+
+ try {
+ client.dropTable(dbName, tblName);
+ } catch (Exception e) {
+ }
+ try {
+ client.dropDatabase(dbName);
+ } catch (Exception e) {
+ }
+ client.createDatabase(new Database(dbName, "", null, null));
+ assertNotNull((client.getDatabase(dbName).getLocationUri()));
+
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("colname", serdeConstants.STRING_TYPE_NAME, ""));
+
+ Table tbl = new Table();
+ tbl.setDbName(dbName);
+ tbl.setTableName(tblName);
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(fields);
+ tbl.setSd(sd);
+
+ //sd.setLocation("hdfs://tmp");
+ sd.setInputFormat(RCFileInputFormat.class.getName());
+ sd.setOutputFormat(RCFileOutputFormat.class.getName());
+ sd.setParameters(new HashMap<String, String>());
+ sd.getParameters().put("test_param_1", "Use this for comments etc");
+ //sd.setBucketCols(new ArrayList<String>(2));
+ //sd.getBucketCols().add("name");
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().setSerializationLib(
+ org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+ tbl.setPartitionKeys(fields);
+
+ Map<String, String> tableParams = new HashMap<String, String>();
+ tableParams.put("hcat.testarg", "testArgValue");
+
+ tbl.setParameters(tableParams);
+
+ client.createTable(tbl);
+ Path tblPath = new Path(client.getTable(dbName, tblName).getSd().getLocation());
+ assertTrue(tblPath.getFileSystem(hiveConf).mkdirs(new Path(tblPath, "colname=p1")));
+
+ }
+
+ public void testSetOutput() throws Exception {
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "test outputformat");
+
+ Map<String, String> partitionValues = new HashMap<String, String>();
+ partitionValues.put("colname", "p1");
+ //null server url means local mode
+ OutputJobInfo info = OutputJobInfo.create(dbName, tblName, partitionValues);
+
+ HCatOutputFormat.setOutput(job, info);
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(job);
+
+ assertNotNull(jobInfo.getTableInfo());
+ assertEquals(1, jobInfo.getPartitionValues().size());
+ assertEquals("p1", jobInfo.getPartitionValues().get("colname"));
+ assertEquals(1, jobInfo.getTableInfo().getDataColumns().getFields().size());
+ assertEquals("colname", jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName());
+
+ publishTest(job);
+ }
+
+ public void publishTest(Job job) throws Exception {
+ OutputCommitter committer = new FileOutputCommitterContainer(job, null);
+ committer.commitJob(job);
+
+ Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));
+ assertNotNull(part);
+
+ StorerInfo storer = InternalUtil.extractStorerInfo(part.getSd(), part.getParameters());
+ assertEquals(storer.getProperties().get("hcat.testarg"), "testArgValue");
+ assertTrue(part.getSd().getLocation().indexOf("p1") != -1);
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.NoExitSecurityManager;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHCatPartitionPublish {
+ private static Configuration mrConf = null;
+ private static FileSystem fs = null;
+ private static MiniMRCluster mrCluster = null;
+ private static boolean isServerRunning = false;
+ private static final int msPort = 20101;
+ private static HiveConf hcatConf;
+ private static HiveMetaStoreClient msc;
+ private static SecurityManager securityManager;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ Configuration conf = new Configuration(true);
+ conf.set("yarn.scheduler.capacity.root.queues", "default");
+ conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
+
+ fs = FileSystem.get(conf);
+ System.setProperty("hadoop.log.dir", new File(fs.getWorkingDirectory()
+ .toString(), "/logs").getAbsolutePath());
+ // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+ // to use MiniMRCluster. MAPREDUCE-2350
+ mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+ new JobConf(conf));
+ mrConf = mrCluster.createJobConf();
+
+ if (isServerRunning) {
+ return;
+ }
+
+ MetaStoreUtils.startMetaStore(msPort, ShimLoader
+ .getHadoopThriftAuthBridge());
+ isServerRunning = true;
+ securityManager = System.getSecurityManager();
+ System.setSecurityManager(new NoExitSecurityManager());
+
+ hcatConf = new HiveConf(TestHCatPartitionPublish.class);
+ hcatConf.set("hive.metastore.local", "false");
+ hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
+ + msPort);
+ hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+ hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES, 3);
+ hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
+ "false");
+ msc = new HiveMetaStoreClient(hcatConf, null);
+ System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+ System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+ System.setSecurityManager(securityManager);
+ isServerRunning = false;
+ }
+
+ @Test
+ public void testPartitionPublish() throws Exception {
+ String dbName = "default";
+ String tableName = "testHCatPartitionedTable";
+ createTable(null, tableName);
+
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ partitionMap.put("part1", "p1value1");
+ partitionMap.put("part0", "p0value1");
+
+ ArrayList<HCatFieldSchema> hcatTableColumns = new ArrayList<HCatFieldSchema>();
+ for (FieldSchema fs : getTableColumns()) {
+ hcatTableColumns.add(HCatSchemaUtils.getHCatFieldSchema(fs));
+ }
+
+ runMRCreateFail(dbName, tableName, partitionMap, hcatTableColumns);
+ List<String> ptns = msc.listPartitionNames(dbName, tableName,
+ (short) 10);
+ Assert.assertEquals(0, ptns.size());
+ Table table = msc.getTable(dbName, tableName);
+ Assert.assertTrue(table != null);
+ // Also make sure that the directory has been deleted in the table
+ // location.
+ Assert.assertFalse(fs.exists(new Path(table.getSd().getLocation()
+ + "/part1=p1value1/part0=p0value1")));
+ }
+
+ void runMRCreateFail(
+ String dbName, String tableName, Map<String, String> partitionValues,
+ List<HCatFieldSchema> columns) throws Exception {
+
+ Job job = new Job(mrConf, "hcat mapreduce write fail test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(TestHCatPartitionPublish.MapFail.class);
+
+ // input/output settings
+ job.setInputFormatClass(TextInputFormat.class);
+
+ Path path = new Path(fs.getWorkingDirectory(),
+ "mapred/testHCatMapReduceInput");
+ // The write count does not matter, as the map will fail in its first
+ // call.
+ createInputFile(path, 5);
+
+ TextInputFormat.setInputPaths(job, path);
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName,
+ partitionValues);
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(DefaultHCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ HCatOutputFormat.setSchema(job, new HCatSchema(columns));
+
+ boolean success = job.waitForCompletion(true);
+ Assert.assertTrue(success == false);
+ }
+
+ private void createInputFile(Path path, int rowCount) throws IOException {
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ FSDataOutputStream os = fs.create(path);
+ for (int i = 0; i < rowCount; i++) {
+ os.writeChars(i + "\n");
+ }
+ os.close();
+ }
+
+ public static class MapFail extends
+ Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ {
+ throw new IOException("Exception to mimic job failure.");
+ }
+ }
+ }
+
+ private void createTable(String dbName, String tableName) throws Exception {
+ String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME
+ : dbName;
+ try {
+ msc.dropTable(databaseName, tableName);
+ } catch (Exception e) {
+ } // can fail with NoSuchObjectException
+
+ Table tbl = new Table();
+ tbl.setDbName(databaseName);
+ tbl.setTableName(tableName);
+ tbl.setTableType("MANAGED_TABLE");
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(getTableColumns());
+ tbl.setPartitionKeys(getPartitionKeys());
+ tbl.setSd(sd);
+ sd.setBucketCols(new ArrayList<String>(2));
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().setSerializationLib(ColumnarSerDe.class.getName());
+ sd.setInputFormat(RCFileInputFormat.class.getName());
+ sd.setOutputFormat(RCFileOutputFormat.class.getName());
+
+ Map<String, String> tableParams = new HashMap<String, String>();
+ tbl.setParameters(tableParams);
+
+ msc.createTable(tbl);
+ }
+
+ protected List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ // Defining partition names in unsorted order
+ fields.add(new FieldSchema("PaRT1", serdeConstants.STRING_TYPE_NAME, ""));
+ fields.add(new FieldSchema("part0", serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHCatPartitioned extends HCatMapReduceTest {
+
+ private static List<HCatRecord> writeRecords;
+ private static List<HCatFieldSchema> partitionColumns;
+
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+
+ tableName = "testHCatPartitionedTable";
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for (int i = 0; i < 20; i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("strvalue" + i);
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")));
+ }
+
+
+ @Override
+ protected List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ //Defining partition names in unsorted order
+ fields.add(new FieldSchema("PaRT1", serdeConstants.STRING_TYPE_NAME, ""));
+ fields.add(new FieldSchema("part0", serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ @Override
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+
+ @Test
+ public void testHCatPartitionedTable() throws Exception {
+
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ partitionMap.put("part1", "p1value1");
+ partitionMap.put("part0", "p0value1");
+
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+
+ partitionMap.clear();
+ partitionMap.put("PART1", "p1value2");
+ partitionMap.put("PART0", "p0value2");
+
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+
+ //Test for duplicate publish
+ IOException exc = null;
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType());
+
+ //Test for publish with invalid partition key name
+ exc = null;
+ partitionMap.clear();
+ partitionMap.put("px1", "p1value2");
+ partitionMap.put("px0", "p0value2");
+
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType());
+
+ //Test for publish with missing partition key values
+ exc = null;
+ partitionMap.clear();
+ partitionMap.put("px", "p1value2");
+
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+
+
+ //Test for null partition value map
+ exc = null;
+ try {
+ runMRCreate(null, partitionColumns, writeRecords, 20, false);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc == null);
+// assertTrue(exc instanceof HCatException);
+// assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType());
+ // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values
+
+ //Read should get 10 + 20 rows
+ runMRRead(30);
+
+ //Read with partition filter
+ runMRRead(10, "part1 = \"p1value1\"");
+ runMRRead(20, "part1 = \"p1value2\"");
+ runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
+ runMRRead(10, "part0 = \"p0value1\"");
+ runMRRead(20, "part0 = \"p0value2\"");
+ runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\"");
+
+ tableSchemaTest();
+ columnOrderChangeTest();
+ hiveReadTest();
+ }
+
+
+ //test that new columns gets added to table schema
+ private void tableSchemaTest() throws Exception {
+
+ HCatSchema tableSchema = getTableSchema();
+
+ assertEquals(4, tableSchema.getFields().size());
+
+ //Update partition schema to have 3 fields
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", serdeConstants.STRING_TYPE_NAME, "")));
+
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for (int i = 0; i < 20; i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("strvalue" + i);
+ objList.add("str2value" + i);
+
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ partitionMap.put("part1", "p1value5");
+ partitionMap.put("part0", "p0value5");
+
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+
+ tableSchema = getTableSchema();
+
+ //assert that c3 has got added to table schema
+ assertEquals(5, tableSchema.getFields().size());
+ assertEquals("c1", tableSchema.getFields().get(0).getName());
+ assertEquals("c2", tableSchema.getFields().get(1).getName());
+ assertEquals("c3", tableSchema.getFields().get(2).getName());
+ assertEquals("part1", tableSchema.getFields().get(3).getName());
+ assertEquals("part0", tableSchema.getFields().get(4).getName());
+
+ //Test that changing column data type fails
+ partitionMap.clear();
+ partitionMap.put("part1", "p1value6");
+ partitionMap.put("part0", "p0value6");
+
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.INT_TYPE_NAME, "")));
+
+ IOException exc = null;
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType());
+
+ //Test that partition key is not allowed in data
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", serdeConstants.STRING_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("part1", serdeConstants.STRING_TYPE_NAME, "")));
+
+ List<HCatRecord> recordsContainingPartitionCols = new ArrayList<HCatRecord>(20);
+ for (int i = 0; i < 20; i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("c2value" + i);
+ objList.add("c3value" + i);
+ objList.add("p1value6");
+
+ recordsContainingPartitionCols.add(new DefaultHCatRecord(objList));
+ }
+
+ exc = null;
+ try {
+ runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ List<HCatRecord> records = runMRRead(20, "part1 = \"p1value6\"");
+ assertEquals(20, records.size());
+ records = runMRRead(20, "part0 = \"p0value6\"");
+ assertEquals(20, records.size());
+ Integer i = 0;
+ for (HCatRecord rec : records) {
+ assertEquals(5, rec.size());
+ assertTrue(rec.get(0).equals(i));
+ assertTrue(rec.get(1).equals("c2value" + i));
+ assertTrue(rec.get(2).equals("c3value" + i));
+ assertTrue(rec.get(3).equals("p1value6"));
+ assertTrue(rec.get(4).equals("p0value6"));
+ i++;
+ }
+ }
+
+ //check behavior while change the order of columns
+ private void columnOrderChangeTest() throws Exception {
+
+ HCatSchema tableSchema = getTableSchema();
+
+ assertEquals(5, tableSchema.getFields().size());
+
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", serdeConstants.STRING_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")));
+
+
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for (int i = 0; i < 10; i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("co strvalue" + i);
+ objList.add("co str2value" + i);
+
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ partitionMap.put("part1", "p1value8");
+ partitionMap.put("part0", "p0value8");
+
+ Exception exc = null;
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType());
+
+
+ partitionColumns = new ArrayList<HCatFieldSchema>();
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+ partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")));
+
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for (int i = 0; i < 10; i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("co strvalue" + i);
+
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
+
+ //Read should get 10 + 20 + 10 + 10 + 20 rows
+ runMRRead(70);
+ }
+
+ //Test that data inserted through hcatoutputformat is readable from hive
+ private void hiveReadTest() throws Exception {
+
+ String query = "select * from " + tableName;
+ int retCode = driver.run(query).getResponseCode();
+
+ if (retCode != 0) {
+ throw new Exception("Error " + retCode + " running query " + query);
+ }
+
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ assertEquals(70, res.size());
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestInputJobInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestInputJobInfo.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestInputJobInfo.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestInputJobInfo.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.util.Properties;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+public class TestInputJobInfo extends HCatBaseTest {
+
+ @Test
+ public void test4ArgCreate() throws Exception {
+ Properties p = new Properties();
+ p.setProperty("key", "value");
+ InputJobInfo jobInfo = InputJobInfo.create("Db", "Table", "Filter", p);
+ Assert.assertEquals("Db", jobInfo.getDatabaseName());
+ Assert.assertEquals("Table", jobInfo.getTableName());
+ Assert.assertEquals("Filter", jobInfo.getFilter());
+ Assert.assertEquals("value", jobInfo.getProperties().getProperty("key"));
+ }
+
+ @Test
+ public void test3ArgCreate() throws Exception {
+ InputJobInfo jobInfo = InputJobInfo.create("Db", "Table", "Filter");
+ Assert.assertEquals("Db", jobInfo.getDatabaseName());
+ Assert.assertEquals("Table", jobInfo.getTableName());
+ Assert.assertEquals("Filter", jobInfo.getFilter());
+ Assert.assertEquals(0, jobInfo.getProperties().size());
+ }
+}