You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC
svn commit: r1520466 [10/18] - in /hive/trunk/hcatalog:
core/src/main/java/org/apache/hcatalog/cli/
core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/
core/src/main/java/org/apache/hcatalog/common/
core/src/main/java/org/apache/hcatalog/data/...
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/TestJsonSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/TestJsonSerDe.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/TestJsonSerDe.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/TestJsonSerDe.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.data;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestJsonSerDe extends TestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestJsonSerDe.class);
+
+ public List<Pair<Properties, HCatRecord>> getData() {
+ List<Pair<Properties, HCatRecord>> data = new ArrayList<Pair<Properties, HCatRecord>>();
+
+ List<Object> rlist = new ArrayList<Object>(13);
+ rlist.add(new Byte("123"));
+ rlist.add(new Short("456"));
+ rlist.add(new Integer(789));
+ rlist.add(new Long(1000L));
+ rlist.add(new Double(5.3D));
+ rlist.add(new Float(2.39F));
+ rlist.add(new String("hcat and hadoop"));
+ rlist.add(null);
+
+ List<Object> innerStruct = new ArrayList<Object>(2);
+ innerStruct.add(new String("abc"));
+ innerStruct.add(new String("def"));
+ rlist.add(innerStruct);
+
+ List<Integer> innerList = new ArrayList<Integer>();
+ innerList.add(314);
+ innerList.add(007);
+ rlist.add(innerList);
+
+ Map<Short, String> map = new HashMap<Short, String>(3);
+ map.put(new Short("2"), "hcat is cool");
+ map.put(new Short("3"), "is it?");
+ map.put(new Short("4"), "or is it not?");
+ rlist.add(map);
+
+ rlist.add(new Boolean(true));
+
+ List<Object> c1 = new ArrayList<Object>();
+ List<Object> c1_1 = new ArrayList<Object>();
+ c1_1.add(new Integer(12));
+ List<Object> i2 = new ArrayList<Object>();
+ List<Integer> ii1 = new ArrayList<Integer>();
+ ii1.add(new Integer(13));
+ ii1.add(new Integer(14));
+ i2.add(ii1);
+ Map<String, List<?>> ii2 = new HashMap<String, List<?>>();
+ List<Integer> iii1 = new ArrayList<Integer>();
+ iii1.add(new Integer(15));
+ ii2.put("phew", iii1);
+ i2.add(ii2);
+ c1_1.add(i2);
+ c1.add(c1_1);
+ rlist.add(c1);
+
+ List<Object> nlist = new ArrayList<Object>(13);
+ nlist.add(null); // tinyint
+ nlist.add(null); // smallint
+ nlist.add(null); // int
+ nlist.add(null); // bigint
+ nlist.add(null); // double
+ nlist.add(null); // float
+ nlist.add(null); // string
+ nlist.add(null); // string
+ nlist.add(null); // struct
+ nlist.add(null); // array
+ nlist.add(null); // map
+ nlist.add(null); // bool
+ nlist.add(null); // complex
+
+ String typeString =
+ "tinyint,smallint,int,bigint,double,float,string,string,"
+ + "struct<a:string,b:string>,array<int>,map<smallint,string>,boolean,"
+ + "array<struct<i1:int,i2:struct<ii1:array<int>,ii2:map<string,struct<iii1:int>>>>>";
+ Properties props = new Properties();
+
+ props.put(serdeConstants.LIST_COLUMNS, "ti,si,i,bi,d,f,s,n,r,l,m,b,c1");
+ props.put(serdeConstants.LIST_COLUMN_TYPES, typeString);
+// props.put(Constants.SERIALIZATION_NULL_FORMAT, "\\N");
+// props.put(Constants.SERIALIZATION_FORMAT, "1");
+
+ data.add(new Pair(props, new DefaultHCatRecord(rlist)));
+ data.add(new Pair(props, new DefaultHCatRecord(nlist)));
+ return data;
+ }
+
+ public void testRW() throws Exception {
+
+ Configuration conf = new Configuration();
+
+ for (Pair<Properties, HCatRecord> e : getData()) {
+ Properties tblProps = e.first;
+ HCatRecord r = e.second;
+
+ HCatRecordSerDe hrsd = new HCatRecordSerDe();
+ hrsd.initialize(conf, tblProps);
+
+ JsonSerDe jsde = new JsonSerDe();
+ jsde.initialize(conf, tblProps);
+
+ LOG.info("ORIG:{}", r);
+
+ Writable s = hrsd.serialize(r, hrsd.getObjectInspector());
+ LOG.info("ONE:{}", s);
+
+ Object o1 = hrsd.deserialize(s);
+ assertTrue(HCatDataCheckUtil.recordsEqual(r, (HCatRecord) o1));
+
+ Writable s2 = jsde.serialize(o1, hrsd.getObjectInspector());
+ LOG.info("TWO:{}", s2);
+ Object o2 = jsde.deserialize(s2);
+ LOG.info("deserialized TWO : {} ", o2);
+
+ assertTrue(HCatDataCheckUtil.recordsEqual(r, (HCatRecord) o2));
+ }
+
+ }
+
+ public void testRobustRead() throws Exception {
+ /**
+ * This test has been added to account for HCATALOG-436
+ * We write out columns with "internal column names" such
+ * as "_col0", but try to read with retular column names.
+ */
+
+ Configuration conf = new Configuration();
+
+ for (Pair<Properties, HCatRecord> e : getData()) {
+ Properties tblProps = e.first;
+ HCatRecord r = e.second;
+
+ Properties internalTblProps = new Properties();
+ for (Map.Entry pe : tblProps.entrySet()) {
+ if (!pe.getKey().equals(serdeConstants.LIST_COLUMNS)) {
+ internalTblProps.put(pe.getKey(), pe.getValue());
+ } else {
+ internalTblProps.put(pe.getKey(), getInternalNames((String) pe.getValue()));
+ }
+ }
+
+ LOG.info("orig tbl props:{}", tblProps);
+ LOG.info("modif tbl props:{}", internalTblProps);
+
+ JsonSerDe wjsd = new JsonSerDe();
+ wjsd.initialize(conf, internalTblProps);
+
+ JsonSerDe rjsd = new JsonSerDe();
+ rjsd.initialize(conf, tblProps);
+
+ LOG.info("ORIG:{}", r);
+
+ Writable s = wjsd.serialize(r, wjsd.getObjectInspector());
+ LOG.info("ONE:{}", s);
+
+ Object o1 = wjsd.deserialize(s);
+ LOG.info("deserialized ONE : {} ", o1);
+
+ Object o2 = rjsd.deserialize(s);
+ LOG.info("deserialized TWO : {} ", o2);
+ assertTrue(HCatDataCheckUtil.recordsEqual(r, (HCatRecord) o2));
+ }
+
+ }
+
+ String getInternalNames(String columnNames) {
+ if (columnNames == null) {
+ return null;
+ }
+ if (columnNames.isEmpty()) {
+ return "";
+ }
+
+ StringBuffer sb = new StringBuffer();
+ int numStrings = columnNames.split(",").length;
+ sb.append("_col0");
+ for (int i = 1; i < numStrings; i++) {
+ sb.append(",");
+ sb.append(HiveConf.getColumnInternalName(i));
+ }
+ return sb.toString();
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/TestLazyHCatRecord.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/TestLazyHCatRecord.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/TestLazyHCatRecord.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/TestLazyHCatRecord.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestLazyHCatRecord {
+
+ private final int INT_CONST = 789;
+ private final long LONG_CONST = 5000000000L;
+ private final double DOUBLE_CONST = 3.141592654;
+ private final String STRING_CONST = "hello world";
+
+ @Test
+ public void testGet() throws Exception {
+ HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector());
+ Assert.assertEquals(INT_CONST, ((Integer) r.get(0)).intValue());
+ Assert.assertEquals(LONG_CONST, ((Long) r.get(1)).longValue());
+ Assert.assertEquals(DOUBLE_CONST, ((Double) r.get(2)).doubleValue(), 0);
+ Assert.assertEquals(STRING_CONST, (String) r.get(3));
+ }
+
+ @Test
+ public void testGetWithName() throws Exception {
+ TypeInfo ti = getTypeInfo();
+ HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector(ti));
+ HCatSchema schema = HCatSchemaUtils.getHCatSchema(ti)
+ .get(0).getStructSubSchema();
+ Assert.assertEquals(INT_CONST, ((Integer) r.get("an_int", schema)).intValue());
+ Assert.assertEquals(LONG_CONST, ((Long) r.get("a_long", schema)).longValue());
+ Assert.assertEquals(DOUBLE_CONST, ((Double) r.get("a_double", schema)).doubleValue(), 0);
+ Assert.assertEquals(STRING_CONST, (String) r.get("a_string", schema));
+ }
+
+ @Test
+ public void testGetAll() throws Exception {
+ HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector());
+ List<Object> list = r.getAll();
+ Assert.assertEquals(INT_CONST, ((Integer) list.get(0)).intValue());
+ Assert.assertEquals(LONG_CONST, ((Long) list.get(1)).longValue());
+ Assert.assertEquals(DOUBLE_CONST, ((Double) list.get(2)).doubleValue(), 0);
+ Assert.assertEquals(STRING_CONST, (String) list.get(3));
+ }
+
+ @Test
+ public void testSet() throws Exception {
+ HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector());
+ boolean sawException = false;
+ try {
+ r.set(3, "Mary had a little lamb");
+ } catch (UnsupportedOperationException uoe) {
+ sawException = true;
+ }
+ Assert.assertTrue(sawException);
+ }
+
+ @Test
+ public void testSize() throws Exception {
+ HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector());
+ Assert.assertEquals(4, r.size());
+ }
+
+ @Test
+ public void testReadFields() throws Exception {
+ HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector());
+ boolean sawException = false;
+ try {
+ r.readFields(null);
+ } catch (UnsupportedOperationException uoe) {
+ sawException = true;
+ }
+ Assert.assertTrue(sawException);
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector());
+ boolean sawException = false;
+ try {
+ r.write(null);
+ } catch (UnsupportedOperationException uoe) {
+ sawException = true;
+ }
+ Assert.assertTrue(sawException);
+ }
+
+ @Test
+ public void testSetWithName() throws Exception {
+ HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector());
+ boolean sawException = false;
+ try {
+ r.set("fred", null, "bob");
+ } catch (UnsupportedOperationException uoe) {
+ sawException = true;
+ }
+ Assert.assertTrue(sawException);
+ }
+
+ @Test
+ public void testRemove() throws Exception {
+ HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector());
+ boolean sawException = false;
+ try {
+ r.remove(0);
+ } catch (UnsupportedOperationException uoe) {
+ sawException = true;
+ }
+ Assert.assertTrue(sawException);
+ }
+
+ @Test
+ public void testCopy() throws Exception {
+ HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector());
+ boolean sawException = false;
+ try {
+ r.copy(null);
+ } catch (UnsupportedOperationException uoe) {
+ sawException = true;
+ }
+ Assert.assertTrue(sawException);
+ }
+
+ @Test
+ public void testGetWritable() throws Exception {
+ HCatRecord r = new LazyHCatRecord(getHCatRecord(), getObjectInspector()).getWritable();
+ Assert.assertEquals(INT_CONST, ((Integer) r.get(0)).intValue());
+ Assert.assertEquals(LONG_CONST, ((Long) r.get(1)).longValue());
+ Assert.assertEquals(DOUBLE_CONST, ((Double) r.get(2)).doubleValue(), 0);
+ Assert.assertEquals(STRING_CONST, (String) r.get(3));
+ Assert.assertEquals("org.apache.hcatalog.data.DefaultHCatRecord", r.getClass().getName());
+ }
+
+ private HCatRecord getHCatRecord() throws Exception {
+ List<Object> rec_1 = new ArrayList<Object>(4);
+ rec_1.add( new Integer(INT_CONST));
+ rec_1.add( new Long(LONG_CONST));
+ rec_1.add( new Double(DOUBLE_CONST));
+ rec_1.add( new String(STRING_CONST));
+
+ return new DefaultHCatRecord(rec_1);
+ }
+
+ private TypeInfo getTypeInfo() throws Exception {
+ List<String> names = new ArrayList<String>(4);
+ names.add("an_int");
+ names.add("a_long");
+ names.add("a_double");
+ names.add("a_string");
+
+ List<TypeInfo> tis = new ArrayList<TypeInfo>(4);
+ tis.add(TypeInfoFactory.getPrimitiveTypeInfo("int"));
+ tis.add(TypeInfoFactory.getPrimitiveTypeInfo("bigint"));
+ tis.add(TypeInfoFactory.getPrimitiveTypeInfo("double"));
+ tis.add(TypeInfoFactory.getPrimitiveTypeInfo("string"));
+
+ return TypeInfoFactory.getStructTypeInfo(names, tis);
+ }
+
+ private ObjectInspector getObjectInspector(TypeInfo ti) throws Exception {
+ return HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector((StructTypeInfo)ti);
+ }
+
+ private ObjectInspector getObjectInspector() throws Exception {
+ return HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector(
+ (StructTypeInfo)getTypeInfo());
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/TestReaderWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/TestReaderWriter.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/TestReaderWriter.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/TestReaderWriter.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hcatalog.data.transfer.HCatReader;
+import org.apache.hcatalog.data.transfer.HCatWriter;
+import org.apache.hcatalog.data.transfer.ReadEntity;
+import org.apache.hcatalog.data.transfer.ReaderContext;
+import org.apache.hcatalog.data.transfer.WriteEntity;
+import org.apache.hcatalog.data.transfer.WriterContext;
+import org.apache.hcatalog.mapreduce.HCatBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestReaderWriter extends HCatBaseTest {
+
+ @Test
+ public void test() throws MetaException, CommandNeedRetryException,
+ IOException, ClassNotFoundException {
+
+ driver.run("drop table mytbl");
+ driver.run("create table mytbl (a string, b int)");
+ Iterator<Entry<String, String>> itr = hiveConf.iterator();
+ Map<String, String> map = new HashMap<String, String>();
+ while (itr.hasNext()) {
+ Entry<String, String> kv = itr.next();
+ map.put(kv.getKey(), kv.getValue());
+ }
+
+ WriterContext cntxt = runsInMaster(map);
+
+ File writeCntxtFile = File.createTempFile("hcat-write", "temp");
+ writeCntxtFile.deleteOnExit();
+
+ // Serialize context.
+ ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
+ oos.writeObject(cntxt);
+ oos.flush();
+ oos.close();
+
+ // Now, deserialize it.
+ ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
+ cntxt = (WriterContext) ois.readObject();
+ ois.close();
+
+ runsInSlave(cntxt);
+ commit(map, true, cntxt);
+
+ ReaderContext readCntxt = runsInMaster(map, false);
+
+ File readCntxtFile = File.createTempFile("hcat-read", "temp");
+ readCntxtFile.deleteOnExit();
+ oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
+ oos.writeObject(readCntxt);
+ oos.flush();
+ oos.close();
+
+ ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
+ readCntxt = (ReaderContext) ois.readObject();
+ ois.close();
+
+ for (InputSplit split : readCntxt.getSplits()) {
+ runsInSlave(split, readCntxt.getConf());
+ }
+ }
+
+ private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
+
+ WriteEntity.Builder builder = new WriteEntity.Builder();
+ WriteEntity entity = builder.withTable("mytbl").build();
+ HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+ WriterContext info = writer.prepareWrite();
+ return info;
+ }
+
+ private ReaderContext runsInMaster(Map<String, String> config, boolean bogus)
+ throws HCatException {
+ ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
+ HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
+ ReaderContext cntxt = reader.prepareRead();
+ return cntxt;
+ }
+
+ private void runsInSlave(InputSplit split, Configuration config) throws HCatException {
+
+ HCatReader reader = DataTransferFactory.getHCatReader(split, config);
+ Iterator<HCatRecord> itr = reader.read();
+ int i = 1;
+ while (itr.hasNext()) {
+ HCatRecord read = itr.next();
+ HCatRecord written = getRecord(i++);
+ // Argh, HCatRecord doesnt implement equals()
+ Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
+ written.get(0).equals(read.get(0)));
+ Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
+ written.get(1).equals(read.get(1)));
+ Assert.assertEquals(2, read.size());
+ }
+ //Assert.assertFalse(itr.hasNext());
+ }
+
+ private void runsInSlave(WriterContext context) throws HCatException {
+
+ HCatWriter writer = DataTransferFactory.getHCatWriter(context);
+ writer.write(new HCatRecordItr());
+ }
+
+ private void commit(Map<String, String> config, boolean status,
+ WriterContext context) throws IOException {
+
+ WriteEntity.Builder builder = new WriteEntity.Builder();
+ WriteEntity entity = builder.withTable("mytbl").build();
+ HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+ if (status) {
+ writer.commit(context);
+ } else {
+ writer.abort(context);
+ }
+ }
+
+ private static HCatRecord getRecord(int i) {
+ List<Object> list = new ArrayList<Object>(2);
+ list.add("Row #: " + i);
+ list.add(i);
+ return new DefaultHCatRecord(list);
+ }
+
+ private static class HCatRecordItr implements Iterator<HCatRecord> {
+
+ int i = 0;
+
+ @Override
+ public boolean hasNext() {
+ return i++ < 100 ? true : false;
+ }
+
+ @Override
+ public HCatRecord next() {
+ return getRecord(i);
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException();
+ }
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/schema/TestHCatSchema.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/schema/TestHCatSchema.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/schema/TestHCatSchema.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/schema/TestHCatSchema.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.data.schema;
+
+import junit.framework.TestCase;
+import org.apache.hcatalog.common.HCatException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestHCatSchema extends TestCase {
+ public void testCannotAddFieldMoreThanOnce() throws HCatException {
+ List<HCatFieldSchema> fieldSchemaList = new ArrayList<HCatFieldSchema>();
+ fieldSchemaList.add(new HCatFieldSchema("name", HCatFieldSchema.Type.STRING, "What's your handle?"));
+ fieldSchemaList.add(new HCatFieldSchema("age", HCatFieldSchema.Type.INT, "So very old"));
+
+ HCatSchema schema = new HCatSchema(fieldSchemaList);
+
+ assertTrue(schema.getFieldNames().contains("age"));
+ assertEquals(2, schema.getFields().size());
+
+ try {
+ schema.append(new HCatFieldSchema("age", HCatFieldSchema.Type.INT, "So very old"));
+ fail("Was able to append field schema with same name");
+ } catch (HCatException he) {
+ assertTrue(he.getMessage().contains("Attempt to append HCatFieldSchema with already existing name: age."));
+ }
+
+ assertTrue(schema.getFieldNames().contains("age"));
+ assertEquals(2, schema.getFields().size());
+
+ // Should also not be able to add fields of different types with same name
+ try {
+ schema.append(new HCatFieldSchema("age", HCatFieldSchema.Type.STRING, "Maybe spelled out?"));
+ fail("Was able to append field schema with same name");
+ } catch (HCatException he) {
+ assertTrue(he.getMessage().contains("Attempt to append HCatFieldSchema with already existing name: age."));
+ }
+
+ assertTrue(schema.getFieldNames().contains("age"));
+ assertEquals(2, schema.getFields().size());
+ }
+
+ public void testCannotInstantiateSchemaWithRepeatedFieldNames() throws HCatException {
+ List<HCatFieldSchema> fieldSchemaList = new ArrayList<HCatFieldSchema>();
+
+ fieldSchemaList.add(new HCatFieldSchema("memberID", HCatFieldSchema.Type.INT, "as a number"));
+ fieldSchemaList.add(new HCatFieldSchema("location", HCatFieldSchema.Type.STRING, "there's Waldo"));
+
+ // No duplicate names. This should be ok
+ HCatSchema schema = new HCatSchema(fieldSchemaList);
+
+ fieldSchemaList.add(new HCatFieldSchema("memberID", HCatFieldSchema.Type.STRING, "as a String"));
+
+ // Now a duplicated field name. Should fail
+ try {
+ HCatSchema schema2 = new HCatSchema(fieldSchemaList);
+ fail("Able to add duplicate field name");
+ } catch (IllegalArgumentException iae) {
+ assertTrue(iae.getMessage().contains("Field named memberID already exists"));
+ }
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/schema/TestHCatSchemaUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/schema/TestHCatSchemaUtils.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/schema/TestHCatSchemaUtils.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/data/schema/TestHCatSchemaUtils.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.data.schema;
+
+import java.io.PrintStream;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.schema.HCatFieldSchema.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHCatSchemaUtils extends TestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestHCatSchemaUtils.class);
+
+ public void testSimpleOperation() throws Exception {
+ String typeString = "struct<name:string,studentid:int,"
+ + "contact:struct<phno:string,email:string>,"
+ + "currently_registered_courses:array<string>,"
+ + "current_grades:map<string,string>,"
+ + "phnos:array<struct<phno:string,type:string>>,blah:array<int>>";
+
+ TypeInfo ti = TypeInfoUtils.getTypeInfoFromTypeString(typeString);
+
+ HCatSchema hsch = HCatSchemaUtils.getHCatSchemaFromTypeString(typeString);
+ LOG.info("Type name : {}", ti.getTypeName());
+ LOG.info("HCatSchema : {}", hsch);
+ assertEquals(hsch.size(), 1);
+ assertEquals(ti.getTypeName(), hsch.get(0).getTypeString());
+ assertEquals(hsch.get(0).getTypeString(), typeString);
+ }
+
+ @SuppressWarnings("unused")
+ private void pretty_print(PrintStream pout, HCatSchema hsch) throws HCatException {
+ pretty_print(pout, hsch, "");
+ }
+
+
+ private void pretty_print(PrintStream pout, HCatSchema hsch, String prefix) throws HCatException {
+ int i = 0;
+ for (HCatFieldSchema field : hsch.getFields()) {
+ pretty_print(pout, field, prefix + "." + (field.getName() == null ? i : field.getName()));
+ i++;
+ }
+ }
+
+ private void pretty_print(PrintStream pout, HCatFieldSchema hfsch, String prefix) throws HCatException {
+
+ Category tcat = hfsch.getCategory();
+ if (Category.STRUCT == tcat) {
+ pretty_print(pout, hfsch.getStructSubSchema(), prefix);
+ } else if (Category.ARRAY == tcat) {
+ pretty_print(pout, hfsch.getArrayElementSchema(), prefix);
+ } else if (Category.MAP == tcat) {
+ pout.println(prefix + ".mapkey:\t" + hfsch.getMapKeyType().toString());
+ pretty_print(pout, hfsch.getMapValueSchema(), prefix + ".mapvalue:");
+ } else {
+ pout.println(prefix + "\t" + hfsch.getType().toString());
+ }
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/fileformats/TestOrcDynamicPartitioned.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/fileformats/TestOrcDynamicPartitioned.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/fileformats/TestOrcDynamicPartitioned.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/fileformats/TestOrcDynamicPartitioned.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.fileformats;
+
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hcatalog.mapreduce.TestHCatDynamicPartitioned;
+import org.junit.BeforeClass;
+
+public class TestOrcDynamicPartitioned extends TestHCatDynamicPartitioned {
+
+ @BeforeClass
+ public static void generateInputData() throws Exception {
+ tableName = "testOrcDynamicPartitionedTable";
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ generateDataColumns();
+ }
+
+ @Override
+ protected String inputFormat() {
+ return OrcInputFormat.class.getName();
+ }
+
+ @Override
+ protected String outputFormat() {
+ return OrcOutputFormat.class.getName();
+ }
+
+ @Override
+ protected String serdeClass() {
+ return OrcSerde.class.getName();
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java.broken
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java.broken?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java.broken (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java.broken Fri Sep 6 00:49:14 2013
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.data.HCatDataCheckUtil;
+import org.apache.hcatalog.mapred.HCatMapredInputFormat;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.util.UDFContext;
+
+public class TestHiveHCatInputFormat extends TestCase {
+ private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static Driver driver;
+
+ String PTNED_TABLE = "junit_testhiveinputintegration_ptni";
+ String UNPTNED_TABLE = "junit_testhiveinputintegration_noptn";
+ String basicFile = "/tmp/"+PTNED_TABLE+".file";
+
+ public void testFromHive() throws Exception {
+ if (driver == null){
+ driver = HCatDataCheckUtil.instantiateDriver(cluster);
+ }
+
+ Properties props = new Properties();
+ props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+ String basicFileFullName = cluster.getProperties().getProperty("fs.default.name") + basicFile;
+
+ cleanup();
+
+ // create source data file
+ HCatDataCheckUtil.generateDataFile(cluster,basicFile);
+
+ String createPtnedTable = "(j int, s string) partitioned by (i int) "
+ +"stored by '"+HCatStorageHandlerImpl.class.getName()+"' tblproperties"
+ + "('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver',"
+ + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+
+ HCatDataCheckUtil.createTable(driver,PTNED_TABLE,createPtnedTable);
+
+ String createUnptnedTable = "(i int, j int, s string) "
+ +"stored by '"+HCatStorageHandlerImpl.class.getName()+"' tblproperties"
+ + "('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver',"
+ + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+
+ HCatDataCheckUtil.createTable(driver,UNPTNED_TABLE,createUnptnedTable);
+
+
+ driver.run("describe extended "+UNPTNED_TABLE);
+ ArrayList<String> des_values = new ArrayList<String>();
+ driver.getResults(des_values);
+ for (String s : des_values){
+ System.err.println("du:"+s);
+ }
+
+ driver.run("describe extended "+PTNED_TABLE);
+ ArrayList<String> des2_values = new ArrayList<String>();
+ driver.getResults(des2_values);
+ for (String s : des2_values){
+ System.err.println("dp:"+s);
+ }
+
+ // use pig to read from source file and put into this table
+
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+basicFileFullName+"' as (i:int, j:int, s:chararray);");
+ server.registerQuery("store A into '"+UNPTNED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+ server.executeBatch();
+
+ server.setBatchOn();
+ server.registerQuery("A = load '"+basicFileFullName+"' as (i:int, j:int, s:chararray);");
+ server.registerQuery("store A into '"+PTNED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+ server.executeBatch();
+
+ // partitioned by i
+ // select * from tbl;
+ // select j,s,i from tbl;
+ // select * from tbl where i = 3;
+ // select j,s,i from tbl where i = 3;
+ // select * from tbl where j = 3;
+ // select j,s,i from tbl where j = 3;
+
+ ArrayList<String> p_select_star_nofilter = HCatDataCheckUtil.formattedRun(driver,
+ "p_select_star_nofilter","select * from "+PTNED_TABLE);
+ ArrayList<String> p_select_named_nofilter = HCatDataCheckUtil.formattedRun(driver,
+ "p_select_named_nofilter","select j,s,i from "+PTNED_TABLE);
+
+ assertDataIdentical(p_select_star_nofilter,p_select_named_nofilter,50);
+
+ ArrayList<String> p_select_star_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
+ "p_select_star_ptnfilter","select * from "+PTNED_TABLE+" where i = 3");
+ ArrayList<String> p_select_named_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
+ "p_select_named_ptnfilter","select j,s,i from "+PTNED_TABLE+" where i = 3");
+
+ assertDataIdentical(p_select_star_ptnfilter,p_select_named_ptnfilter,10);
+
+ ArrayList<String> select_star_nonptnfilter = HCatDataCheckUtil.formattedRun(driver,
+ "select_star_nonptnfilter","select * from "+PTNED_TABLE+" where j = 28");
+ ArrayList<String> select_named_nonptnfilter = HCatDataCheckUtil.formattedRun(driver,
+ "select_named_nonptnfilter","select j,s,i from "+PTNED_TABLE+" where j = 28");
+
+ assertDataIdentical(select_star_nonptnfilter,select_named_nonptnfilter,1);
+
+ // non-partitioned
+ // select * from tbl;
+ // select i,j,s from tbl;
+ // select * from tbl where i = 3;
+ // select i,j,s from tbl where i = 3;
+
+ // select j,s,i from tbl;
+ // select j,s,i from tbl where i = 3;
+
+ ArrayList<String> select_star_nofilter = HCatDataCheckUtil.formattedRun(driver,
+ "select_star_nofilter","select * from "+UNPTNED_TABLE); //i,j,s select * order is diff for unptn
+ ArrayList<String> select_ijs_nofilter = HCatDataCheckUtil.formattedRun(driver,
+ "select_ijs_nofilter","select i,j,s from "+UNPTNED_TABLE);
+
+ assertDataIdentical(select_star_nofilter,select_ijs_nofilter,50);
+
+ ArrayList<String> select_star_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
+ "select_star_ptnfilter","select * from "+UNPTNED_TABLE+" where i = 3"); //i,j,s
+ ArrayList<String> select_ijs_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
+ "select_ijs_ptnfilter","select i,j,s from "+UNPTNED_TABLE+" where i = 3");
+
+ assertDataIdentical(select_star_ptnfilter,select_ijs_ptnfilter,10);
+
+ ArrayList<String> select_jsi_nofilter = HCatDataCheckUtil.formattedRun(driver,
+ "select_jsi_nofilter","select j,s,i from "+UNPTNED_TABLE);
+ assertDataIdentical(p_select_named_nofilter,select_jsi_nofilter,50,true);
+
+ ArrayList<String> select_jsi_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
+ "select_jsi_ptnfilter","select j,s,i from "+UNPTNED_TABLE+" where i = 3");
+ assertDataIdentical(p_select_named_ptnfilter,select_jsi_ptnfilter,10,true);
+
+ }
+
+ private void assertDataIdentical(ArrayList<String> result1,
+ ArrayList<String> result2, int numRecords) {
+ assertDataIdentical(result1,result2,numRecords,false);
+ }
+
+ private void assertDataIdentical(ArrayList<String> result1,
+ ArrayList<String> result2, int numRecords,boolean doSort) {
+ assertEquals(numRecords, result1.size());
+ assertEquals(numRecords, result2.size());
+ Collections.sort(result1);
+ Collections.sort(result2);
+ for (int i = 0; i < numRecords; i++){
+ assertEquals(result1.get(i),result2.get(i));
+ }
+ }
+
+
+ private void cleanup() throws IOException, CommandNeedRetryException {
+ MiniCluster.deleteFile(cluster, basicFile);
+ HCatDataCheckUtil.dropTable(driver,PTNED_TABLE);
+ HCatDataCheckUtil.dropTable(driver,UNPTNED_TABLE);
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/HCatBaseTest.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/HCatBaseTest.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/HCatBaseTest.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/HCatBaseTest.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.pig.PigServer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Simplify writing HCatalog tests that require a HiveMetaStore.
+ */
+public class HCatBaseTest {
+ protected static final Logger LOG = LoggerFactory.getLogger(HCatBaseTest.class);
+ protected static final String TEST_DATA_DIR = System.getProperty("user.dir") +
+ "/build/test/data/" + HCatBaseTest.class.getCanonicalName();
+ protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+
+ protected HiveConf hiveConf = null;
+ protected Driver driver = null;
+ protected HiveMetaStoreClient client = null;
+
+ @BeforeClass
+ public static void setUpTestDataDir() throws Exception {
+ LOG.info("Using warehouse directory " + TEST_WAREHOUSE_DIR);
+ File f = new File(TEST_WAREHOUSE_DIR);
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ Assert.assertTrue(new File(TEST_WAREHOUSE_DIR).mkdirs());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ if (driver == null) {
+ setUpHiveConf();
+ driver = new Driver(hiveConf);
+ client = new HiveMetaStoreClient(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ }
+ }
+
+ /**
+ * Create a new HiveConf and set properties necessary for unit tests.
+ */
+ protected void setUpHiveConf() {
+ hiveConf = new HiveConf(this.getClass());
+ hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+ hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
+ }
+
+ protected void logAndRegister(PigServer server, String query) throws IOException {
+ LOG.info("Registering pig query: " + query);
+ server.registerQuery(query);
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for HCatOutputFormat. Writes a partition using HCatOutputFormat and reads
+ * it back using HCatInputFormat, checks the column values and counts.
+ */
+public abstract class HCatMapReduceTest extends HCatBaseTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class);
+ protected static String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+ protected static String tableName = "testHCatMapReduceTable";
+
+ private static List<HCatRecord> writeRecords = new ArrayList<HCatRecord>();
+ private static List<HCatRecord> readRecords = new ArrayList<HCatRecord>();
+
+ protected abstract List<FieldSchema> getPartitionKeys();
+
+ protected abstract List<FieldSchema> getTableColumns();
+
+ private static FileSystem fs;
+
+ protected String inputFormat() {
+ return RCFileInputFormat.class.getName();
+ }
+
+ protected String outputFormat() {
+ return RCFileOutputFormat.class.getName();
+ }
+
+ protected String serdeClass() {
+ return ColumnarSerDe.class.getName();
+ }
+
+ @BeforeClass
+ public static void setUpOneTime() throws Exception {
+ fs = new LocalFileSystem();
+ fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, 0);
+ // Hack to initialize cache with 0 expiry time causing it to return a new hive client every time
+ // Otherwise the cache doesn't play well with the second test method with the client gets closed() in the
+ // tearDown() of the previous test
+ HCatUtil.getHiveClient(hiveConf);
+
+ MapCreate.writeCount = 0;
+ MapRead.readCount = 0;
+ }
+
+ @After
+ public void deleteTable() throws Exception {
+ try {
+ String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+
+ client.dropTable(databaseName, tableName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Before
+ public void createTable() throws Exception {
+ String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+
+ try {
+ client.dropTable(databaseName, tableName);
+ } catch (Exception e) {
+ } //can fail with NoSuchObjectException
+
+
+ Table tbl = new Table();
+ tbl.setDbName(databaseName);
+ tbl.setTableName(tableName);
+ tbl.setTableType("MANAGED_TABLE");
+ StorageDescriptor sd = new StorageDescriptor();
+
+ sd.setCols(getTableColumns());
+ tbl.setPartitionKeys(getPartitionKeys());
+
+ tbl.setSd(sd);
+
+ sd.setBucketCols(new ArrayList<String>(2));
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().setSerializationLib(serdeClass());
+ sd.setInputFormat(inputFormat());
+ sd.setOutputFormat(outputFormat());
+
+ Map<String, String> tableParams = new HashMap<String, String>();
+ tbl.setParameters(tableParams);
+
+ client.createTable(tbl);
+ }
+
+ //Create test input file with specified number of rows
+ private void createInputFile(Path path, int rowCount) throws IOException {
+
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+
+ FSDataOutputStream os = fs.create(path);
+
+ for (int i = 0; i < rowCount; i++) {
+ os.writeChars(i + "\n");
+ }
+
+ os.close();
+ }
+
+ public static class MapCreate extends
+ Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+ static int writeCount = 0; //test will be in local mode
+
+ @Override
+ public void map(LongWritable key, Text value, Context context
+ ) throws IOException, InterruptedException {
+ {
+ try {
+ HCatRecord rec = writeRecords.get(writeCount);
+ context.write(null, rec);
+ writeCount++;
+
+ } catch (Exception e) {
+
+ e.printStackTrace(System.err); //print since otherwise exception is lost
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
+ public static class MapRead extends
+ Mapper<WritableComparable, HCatRecord, BytesWritable, Text> {
+
+ static int readCount = 0; //test will be in local mode
+
+ @Override
+ public void map(WritableComparable key, HCatRecord value, Context context
+ ) throws IOException, InterruptedException {
+ {
+ try {
+ readRecords.add(value);
+ readCount++;
+ } catch (Exception e) {
+ e.printStackTrace(); //print since otherwise exception is lost
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
+ Job runMRCreate(Map<String, String> partitionValues,
+ List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
+ int writeCount, boolean assertWrite) throws Exception {
+ return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite, true);
+ }
+
+ /**
+ * Run a local map reduce job to load data from in memory records to an HCatalog Table
+ * @param partitionValues
+ * @param partitionColumns
+ * @param records data to be written to HCatalog table
+ * @param writeCount
+ * @param assertWrite
+ * @param asSingleMapTask
+ * @return
+ * @throws Exception
+ */
+ Job runMRCreate(Map<String, String> partitionValues,
+ List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
+ int writeCount, boolean assertWrite, boolean asSingleMapTask) throws Exception {
+
+ writeRecords = records;
+ MapCreate.writeCount = 0;
+
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "hcat mapreduce write test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(HCatMapReduceTest.MapCreate.class);
+
+ // input/output settings
+ job.setInputFormatClass(TextInputFormat.class);
+
+ if (asSingleMapTask) {
+ // One input path would mean only one map task
+ Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
+ createInputFile(path, writeCount);
+ TextInputFormat.setInputPaths(job, path);
+ } else {
+ // Create two input paths so that two map tasks get triggered. There could be other ways
+ // to trigger two map tasks.
+ Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
+ createInputFile(path, writeCount / 2);
+
+ Path path2 = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput2");
+ createInputFile(path2, (writeCount - writeCount / 2));
+
+ TextInputFormat.setInputPaths(job, path, path2);
+ }
+
+ job.setOutputFormatClass(HCatOutputFormat.class);
+
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues);
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(DefaultHCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
+
+ boolean success = job.waitForCompletion(true);
+
+ // Ensure counters are set when data has actually been read.
+ if (partitionValues != null) {
+ assertTrue(job.getCounters().getGroup("FileSystemCounters")
+ .findCounter("FILE_BYTES_READ").getValue() > 0);
+ }
+
+ if (!HCatUtil.isHadoop23()) {
+ // Local mode outputcommitter hook is not invoked in Hadoop 1.x
+ if (success) {
+ new FileOutputCommitterContainer(job, null).commitJob(job);
+ } else {
+ new FileOutputCommitterContainer(job, null).abortJob(job, JobStatus.State.FAILED);
+ }
+ }
+ if (assertWrite) {
+ // we assert only if we expected to assert with this call.
+ Assert.assertEquals(writeCount, MapCreate.writeCount);
+ }
+
+ return job;
+ }
+
+ List<HCatRecord> runMRRead(int readCount) throws Exception {
+ return runMRRead(readCount, null);
+ }
+
+ /**
+ * Run a local map reduce job to read records from HCatalog table and verify if the count is as expected
+ * @param readCount
+ * @param filter
+ * @return
+ * @throws Exception
+ */
+ List<HCatRecord> runMRRead(int readCount, String filter) throws Exception {
+
+ MapRead.readCount = 0;
+ readRecords.clear();
+
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "hcat mapreduce read test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(HCatMapReduceTest.MapRead.class);
+
+ // input/output settings
+ job.setInputFormatClass(HCatInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ HCatInputFormat.setInput(job, dbName, tableName).setFilter(filter);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setNumReduceTasks(0);
+
+ Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceOutput");
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+
+ TextOutputFormat.setOutputPath(job, path);
+
+ job.waitForCompletion(true);
+ Assert.assertEquals(readCount, MapRead.readCount);
+
+ return readRecords;
+ }
+
+
+ protected HCatSchema getTableSchema() throws Exception {
+
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "hcat mapreduce read schema test");
+ job.setJarByClass(this.getClass());
+
+ // input/output settings
+ job.setInputFormatClass(HCatInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ HCatInputFormat.setInput(job, dbName, tableName);
+
+ return HCatInputFormat.getTableSchema(job);
+ }
+
+}
+
+
+
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+public class TestHCatDynamicPartitioned extends HCatMapReduceTest {
+
+ private static List<HCatRecord> writeRecords;
+ private static List<HCatFieldSchema> dataColumns;
+ private static final Logger LOG = LoggerFactory.getLogger(TestHCatDynamicPartitioned.class);
+ protected static final int NUM_RECORDS = 20;
+ protected static final int NUM_PARTITIONS = 5;
+
+ @BeforeClass
+ public static void generateInputData() throws Exception {
+ tableName = "testHCatDynamicPartitionedTable";
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ generateDataColumns();
+ }
+
+ protected static void generateDataColumns() throws HCatException {
+ dataColumns = new ArrayList<HCatFieldSchema>();
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, "")));
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, "")));
+ dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, "")));
+ }
+
+ protected static void generateWriteRecords(int max, int mod, int offset) {
+ writeRecords = new ArrayList<HCatRecord>();
+
+ for (int i = 0; i < max; i++) {
+ List<Object> objList = new ArrayList<Object>();
+
+ objList.add(i);
+ objList.add("strvalue" + i);
+ objList.add(String.valueOf((i % mod) + offset));
+ writeRecords.add(new DefaultHCatRecord(objList));
+ }
+ }
+
+ @Override
+ protected List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ @Override
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ /**
+ * Run the dynamic partitioning test but with single map task
+ * @throws Exception
+ */
+ @Test
+ public void testHCatDynamicPartitionedTable() throws Exception {
+ runHCatDynamicPartitionedTable(true);
+ }
+
+ /**
+ * Run the dynamic partitioning test but with multiple map task. See HCATALOG-490
+ * @throws Exception
+ */
+ @Test
+ public void testHCatDynamicPartitionedTableMultipleTask() throws Exception {
+ runHCatDynamicPartitionedTable(false);
+ }
+
+ protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask) throws Exception {
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask);
+
+ runMRRead(NUM_RECORDS);
+
+ //Read with partition filter
+ runMRRead(4, "p1 = \"0\"");
+ runMRRead(8, "p1 = \"1\" or p1 = \"3\"");
+ runMRRead(4, "p1 = \"4\"");
+
+ // read from hive to test
+
+ String query = "select * from " + tableName;
+ int retCode = driver.run(query).getResponseCode();
+
+ if (retCode != 0) {
+ throw new Exception("Error " + retCode + " running query " + query);
+ }
+
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ assertEquals(NUM_RECORDS, res.size());
+
+
+ //Test for duplicate publish
+ IOException exc = null;
+ try {
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false);
+
+ if (HCatUtil.isHadoop23()) {
+ Assert.assertTrue(job.isSuccessful()==false);
+ }
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ if (!HCatUtil.isHadoop23()) {
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertTrue("Got exception of type [" + ((HCatException) exc).getErrorType().toString()
+ + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED",
+ (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) exc).getErrorType())
+ || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) exc).getErrorType())
+ );
+ }
+
+ query = "show partitions " + tableName;
+ retCode = driver.run(query).getResponseCode();
+ if (retCode != 0) {
+ throw new Exception("Error " + retCode + " running query " + query);
+ }
+ res = new ArrayList<String>();
+ driver.getResults(res);
+ assertEquals(NUM_PARTITIONS, res.size());
+
+ query = "select * from " + tableName;
+ retCode = driver.run(query).getResponseCode();
+ if (retCode != 0) {
+ throw new Exception("Error " + retCode + " running query " + query);
+ }
+ res = new ArrayList<String>();
+ driver.getResults(res);
+ assertEquals(NUM_RECORDS, res.size());
+ }
+
+ //TODO 1.0 miniCluster is slow this test times out, make it work
+// renaming test to make test framework skip it
+ public void _testHCatDynamicPartitionMaxPartitions() throws Exception {
+ HiveConf hc = new HiveConf(this.getClass());
+
+ int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
+ LOG.info("Max partitions allowed = {}", maxParts);
+
+ IOException exc = null;
+ try {
+ generateWriteRecords(maxParts + 5, maxParts + 2, 10);
+ runMRCreate(null, dataColumns, writeRecords, maxParts + 5, false);
+ } catch (IOException e) {
+ exc = e;
+ }
+
+ if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED) {
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType());
+ } else {
+ assertTrue(exc == null);
+ runMRRead(maxParts + 5);
+ }
+ }
+}
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java.broken
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java.broken?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java.broken (added)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java.broken Fri Sep 6 00:49:14 2013
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.TestHCatEximInputFormat.TestImport.EmpDetails;
+
+/**
+ *
+ * TestHCatEximInputFormat. tests primarily HCatEximInputFormat but
+ * also HCatEximOutputFormat.
+ *
+ */
+public class TestHCatEximInputFormat extends TestCase {
+
+ public static class TestExport extends
+ org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, HCatRecord> {
+
+ private HCatSchema recordSchema;
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
+ recordSchema = HCatEximOutputFormat.getTableSchema(context);
+ }
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ String[] cols = value.toString().split(",");
+ HCatRecord record = new DefaultHCatRecord(recordSchema.size());
+ record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0]));
+ record.setString("emp_name", recordSchema, cols[1]);
+ record.setString("emp_dob", recordSchema, cols[2]);
+ record.setString("emp_sex", recordSchema, cols[3]);
+ context.write(key, record);
+ }
+ }
+
+ public static class TestImport extends
+ org.apache.hadoop.mapreduce.Mapper<
+ org.apache.hadoop.io.LongWritable, HCatRecord,
+ org.apache.hadoop.io.Text,
+ org.apache.hadoop.io.Text> {
+
+ private HCatSchema recordSchema;
+
+ public static class EmpDetails {
+ public String emp_name;
+ public String emp_dob;
+ public String emp_sex;
+ public String emp_country;
+ public String emp_state;
+ }
+
+ public static Map<Integer, EmpDetails> empRecords = new TreeMap<Integer, EmpDetails>();
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
+ try {
+ recordSchema = HCatBaseInputFormat.getOutputSchema(context);
+ } catch (Exception e) {
+ throw new IOException("Error getting outputschema from job configuration", e);
+ }
+ System.out.println("RecordSchema : " + recordSchema.toString());
+ }
+
+ @Override
+ public void map(LongWritable key, HCatRecord value, Context context)
+ throws IOException, InterruptedException {
+ EmpDetails empDetails = new EmpDetails();
+ Integer emp_id = value.getInteger("emp_id", recordSchema);
+ String emp_name = value.getString("emp_name", recordSchema);
+ empDetails.emp_name = emp_name;
+ if (recordSchema.getPosition("emp_dob") != null) {
+ empDetails.emp_dob = value.getString("emp_dob", recordSchema);
+ }
+ if (recordSchema.getPosition("emp_sex") != null) {
+ empDetails.emp_sex = value.getString("emp_sex", recordSchema);
+ }
+ if (recordSchema.getPosition("emp_country") != null) {
+ empDetails.emp_country = value.getString("emp_country", recordSchema);
+ }
+ if (recordSchema.getPosition("emp_state") != null) {
+ empDetails.emp_state = value.getString("emp_state", recordSchema);
+ }
+ empRecords.put(emp_id, empDetails);
+ }
+ }
+
+ private static final String dbName = "hcatEximOutputFormatTestDB";
+ private static final String tblName = "hcatEximOutputFormatTestTable";
+ Configuration conf;
+ Job job;
+ List<HCatFieldSchema> columns;
+ HCatSchema schema;
+ FileSystem fs;
+ Path inputLocation;
+ Path outputLocation;
+ private HCatSchema partSchema;
+
+
+ @Override
+ protected void setUp() throws Exception {
+ System.out.println("Setup started");
+ super.setUp();
+ conf = new Configuration();
+ job = new Job(conf, "test eximinputformat");
+ columns = new ArrayList<HCatFieldSchema>();
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+ Constants.INT_TYPE_NAME, "")));
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+ Constants.STRING_TYPE_NAME, "")));
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
+ Constants.STRING_TYPE_NAME, "")));
+ columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
+ Constants.STRING_TYPE_NAME, "")));
+ schema = new HCatSchema(columns);
+
+ fs = new LocalFileSystem();
+ fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+ inputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports");
+ outputLocation = new Path(fs.getWorkingDirectory(), "tmp/data");
+
+ job.setJarByClass(this.getClass());
+ job.setNumReduceTasks(0);
+ System.out.println("Setup done");
+ }
+
+ private void setupMRExport(String[] records) throws IOException {
+ if (fs.exists(outputLocation)) {
+ fs.delete(outputLocation, true);
+ }
+ FSDataOutputStream ds = fs.create(outputLocation, true);
+ for (String record : records) {
+ ds.writeBytes(record);
+ }
+ ds.close();
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(HCatEximOutputFormat.class);
+ TextInputFormat.setInputPaths(job, outputLocation);
+ job.setMapperClass(TestExport.class);
+ }
+
+ private void setupMRImport() throws IOException {
+ if (fs.exists(outputLocation)) {
+ fs.delete(outputLocation, true);
+ }
+ job.setInputFormatClass(HCatEximInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ TextOutputFormat.setOutputPath(job, outputLocation);
+ job.setMapperClass(TestImport.class);
+ TestImport.empRecords.clear();
+ }
+
+
+ @Override
+ protected void tearDown() throws Exception {
+ System.out.println("Teardown started");
+ super.tearDown();
+ // fs.delete(inputLocation, true);
+ // fs.delete(outputLocation, true);
+ System.out.println("Teardown done");
+ }
+
+
+ private void runNonPartExport() throws IOException, InterruptedException, ClassNotFoundException {
+ if (fs.exists(inputLocation)) {
+ fs.delete(inputLocation, true);
+ }
+ setupMRExport(new String[] {
+ "237,Krishna,01/01/1990,M,IN,TN\n",
+ "238,Kalpana,01/01/2000,F,IN,KA\n",
+ "239,Satya,01/01/2001,M,US,TN\n",
+ "240,Kavya,01/01/2002,F,US,KA\n"
+
+ });
+ HCatEximOutputFormat.setOutput(
+ job,
+ dbName,
+ tblName,
+ inputLocation.toString(),
+ null,
+ null,
+ schema);
+
+ job.waitForCompletion(true);
+ HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
+ committer.cleanupJob(job);
+ }
+
+ private void runPartExport(String record, String country, String state) throws IOException, InterruptedException, ClassNotFoundException {
+ setupMRExport(new String[] {record});
+ List<String> partValues = new ArrayList<String>(2);
+ partValues.add(country);
+ partValues.add(state);
+ HCatEximOutputFormat.setOutput(
+ job,
+ dbName,
+ tblName,
+ inputLocation.toString(),
+ partSchema ,
+ partValues ,
+ schema);
+
+ job.waitForCompletion(true);
+ HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null);
+ committer.cleanupJob(job);
+ }
+
+ public void testNonPart() throws Exception {
+ try {
+ runNonPartExport();
+ setUp();
+ setupMRImport();
+ HCatEximInputFormat.setInput(job, "tmp/exports", null);
+ job.waitForCompletion(true);
+
+ assertEquals(4, TestImport.empRecords.size());
+ assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", null, null);
+ assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", null, null);
+ assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", null, null);
+ assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", null, null);
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public void testNonPartProjection() throws Exception {
+ try {
+
+ runNonPartExport();
+ setUp();
+ setupMRImport();
+ HCatEximInputFormat.setInput(job, "tmp/exports", null);
+
+ List<HCatFieldSchema> readColumns = new ArrayList<HCatFieldSchema>();
+ readColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+ Constants.INT_TYPE_NAME, "")));
+ readColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+ Constants.STRING_TYPE_NAME, "")));
+
+ HCatEximInputFormat.setOutputSchema(job, new HCatSchema(readColumns));
+ job.waitForCompletion(true);
+
+ assertEquals(4, TestImport.empRecords.size());
+ assertEmpDetail(TestImport.empRecords.get(237), "Krishna", null, null, null, null);
+ assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", null, null, null, null);
+ assertEmpDetail(TestImport.empRecords.get(239), "Satya", null, null, null, null);
+ assertEmpDetail(TestImport.empRecords.get(240), "Kavya", null, null, null, null);
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public void testPart() throws Exception {
+ try {
+ if (fs.exists(inputLocation)) {
+ fs.delete(inputLocation, true);
+ }
+
+ List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2);
+ partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, ""));
+ partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, ""));
+ partSchema = new HCatSchema(partKeys);
+
+ runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn");
+ setUp();
+ runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka");
+ setUp();
+ runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn");
+ setUp();
+ runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka");
+
+ setUp();
+ setupMRImport();
+ HCatEximInputFormat.setInput(job, "tmp/exports", null);
+ job.waitForCompletion(true);
+
+ assertEquals(4, TestImport.empRecords.size());
+ assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", "in", "tn");
+ assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka");
+ assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", "us", "tn");
+ assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka");
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public void testPartWithPartCols() throws Exception {
+ try {
+ if (fs.exists(inputLocation)) {
+ fs.delete(inputLocation, true);
+ }
+
+ List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2);
+ partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, ""));
+ partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, ""));
+ partSchema = new HCatSchema(partKeys);
+
+ runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn");
+ setUp();
+ runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka");
+ setUp();
+ runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn");
+ setUp();
+ runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka");
+
+ setUp();
+ setupMRImport();
+ HCatEximInputFormat.setInput(job, "tmp/exports", null);
+
+ List<HCatFieldSchema> colsPlusPartKeys = new ArrayList<HCatFieldSchema>();
+ colsPlusPartKeys.addAll(columns);
+ colsPlusPartKeys.addAll(partKeys);
+
+ HCatBaseInputFormat.setOutputSchema(job, new HCatSchema(colsPlusPartKeys));
+ job.waitForCompletion(true);
+
+ assertEquals(4, TestImport.empRecords.size());
+ assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", "in", "tn");
+ assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka");
+ assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", "us", "tn");
+ assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka");
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+
+ public void testPartSelection() throws Exception {
+ try {
+ if (fs.exists(inputLocation)) {
+ fs.delete(inputLocation, true);
+ }
+
+ List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2);
+ partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, ""));
+ partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, ""));
+ partSchema = new HCatSchema(partKeys);
+
+ runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn");
+ setUp();
+ runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka");
+ setUp();
+ runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn");
+ setUp();
+ runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka");
+
+ setUp();
+ setupMRImport();
+ Map<String, String> filter = new TreeMap<String, String>();
+ filter.put("emp_state", "ka");
+ HCatEximInputFormat.setInput(job, "tmp/exports", filter);
+ job.waitForCompletion(true);
+
+ assertEquals(2, TestImport.empRecords.size());
+ assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka");
+ assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka");
+ } catch (Exception e) {
+ System.out.println("Test failed with " + e.getMessage());
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+
+ private void assertEmpDetail(EmpDetails empDetails, String name, String dob, String mf, String country, String state) {
+ assertNotNull(empDetails);
+ assertEquals(name, empDetails.emp_name);
+ assertEquals(dob, empDetails.emp_dob);
+ assertEquals(mf, empDetails.emp_sex);
+ assertEquals(country, empDetails.emp_country);
+ assertEquals(state, empDetails.emp_state);
+ }
+
+}