You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC
svn commit: r1520466 [14/18] - in /hive/trunk/hcatalog:
core/src/main/java/org/apache/hcatalog/cli/
core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/
core/src/main/java/org/apache/hcatalog/common/
core/src/main/java/org/apache/hcatalog/data/...
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+
+public class MockLoader extends LoadFunc {
+ private static final class MockRecordReader extends RecordReader<Object, Object> {
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException, InterruptedException {
+ return "mockKey";
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException, InterruptedException {
+ return "mockValue";
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0.5f;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException,
+ InterruptedException {
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return true;
+ }
+ }
+
+ private static final class MockInputSplit extends InputSplit implements Writable {
+ private String location;
+
+ public MockInputSplit() {
+ }
+
+ public MockInputSplit(String location) {
+ this.location = location;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[]{location};
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 10000000;
+ }
+
+ @Override
+ public boolean equals(Object arg0) {
+ return arg0 == this;
+ }
+
+ @Override
+ public int hashCode() {
+ return location.hashCode();
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ location = arg0.readUTF();
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ arg0.writeUTF(location);
+ }
+ }
+
+ private static final class MockInputFormat extends InputFormat {
+
+ private final String location;
+
+ public MockInputFormat(String location) {
+ this.location = location;
+ }
+
+ @Override
+ public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
+ throws IOException, InterruptedException {
+ return new MockRecordReader();
+ }
+
+ @Override
+ public List getSplits(JobContext arg0) throws IOException, InterruptedException {
+ return Arrays.asList(new MockInputSplit(location));
+ }
+ }
+
+ private static final Map<String, Iterable<Tuple>> locationToData = new HashMap<String, Iterable<Tuple>>();
+
+ public static void setData(String location, Iterable<Tuple> data) {
+ locationToData.put(location, data);
+ }
+
+ private String location;
+
+ private Iterator<Tuple> data;
+
+ @Override
+ public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+ return location;
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ this.location = location;
+ if (location == null) {
+ throw new IOException("null location passed to MockLoader");
+ }
+ this.data = locationToData.get(location).iterator();
+ if (this.data == null) {
+ throw new IOException("No data configured for location: " + location);
+ }
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ if (data == null) {
+ throw new IOException("data was not correctly initialized in MockLoader");
+ }
+ return data.hasNext() ? data.next() : null;
+ }
+
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ return new MockInputFormat(location);
+ }
+
+ @Override
+ public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException {
+ }
+
+}
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+
+public class MyPigStorage extends PigStorage {
+
+ String arg2;
+
+ public MyPigStorage(String arg1, String arg2) throws IOException {
+ super(arg1);
+ this.arg2 = arg2;
+ }
+
+ @Override
+ public void putNext(Tuple t) throws IOException {
+ t.append(arg2);
+ super.putNext(t);
+ }
+}
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatEximLoader.java.broken
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatEximLoader.java.broken?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatEximLoader.java.broken (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatEximLoader.java.broken Fri Sep 6 00:49:14 2013
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.hcatalog.MiniCluster;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ *
+ * TestHCatEximLoader. Assumes Exim storer is working well
+ *
+ */
+public class TestHCatEximLoader extends TestCase {
+
+ private static final String NONPART_TABLE = "junit_unparted";
+ private static final String PARTITIONED_TABLE = "junit_parted";
+ private static MiniCluster cluster = MiniCluster.buildCluster();
+
+ private static final String dataLocation = "/tmp/data";
+ private static String fqdataLocation;
+ private static final String exportLocation = "/tmp/export";
+ private static String fqexportLocation;
+
+ private static Properties props;
+
+ private void cleanup() throws IOException {
+ MiniCluster.deleteFile(cluster, dataLocation);
+ MiniCluster.deleteFile(cluster, exportLocation);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ props = new Properties();
+ props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+ System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName()
+ + ", fs.default.name : " + props.getProperty("fs.default.name"));
+ fqdataLocation = cluster.getProperties().getProperty("fs.default.name") + dataLocation;
+ fqexportLocation = cluster.getProperties().getProperty("fs.default.name") + exportLocation;
+ System.out.println("FQ Data Location :" + fqdataLocation);
+ System.out.println("FQ Export Location :" + fqexportLocation);
+ cleanup();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ cleanup();
+ }
+
+ private void populateDataFile() throws IOException {
+ MiniCluster.deleteFile(cluster, dataLocation);
+ String[] input = new String[] {
+ "237,Krishna,01/01/1990,M,IN,TN",
+ "238,Kalpana,01/01/2000,F,IN,KA",
+ "239,Satya,01/01/2001,M,US,TN",
+ "240,Kavya,01/01/2002,F,US,KA"
+ };
+ MiniCluster.createInputFile(cluster, dataLocation, input);
+ }
+
+ private static class EmpDetail {
+ String name;
+ String dob;
+ String mf;
+ String country;
+ String state;
+ }
+
+ private void assertEmpDetail(Tuple t, Map<Integer, EmpDetail> eds) throws ExecException {
+ assertNotNull(t);
+ assertEquals(6, t.size());
+
+ assertTrue(t.get(0).getClass() == Integer.class);
+ assertTrue(t.get(1).getClass() == String.class);
+ assertTrue(t.get(2).getClass() == String.class);
+ assertTrue(t.get(3).getClass() == String.class);
+ assertTrue(t.get(4).getClass() == String.class);
+ assertTrue(t.get(5).getClass() == String.class);
+
+ EmpDetail ed = eds.remove(t.get(0));
+ assertNotNull(ed);
+
+ assertEquals(ed.name, t.get(1));
+ assertEquals(ed.dob, t.get(2));
+ assertEquals(ed.mf, t.get(3));
+ assertEquals(ed.country, t.get(4));
+ assertEquals(ed.state, t.get(5));
+ }
+
+ private void addEmpDetail(Map<Integer, EmpDetail> empDetails, int id, String name,
+ String dob, String mf, String country, String state) {
+ EmpDetail ed = new EmpDetail();
+ ed.name = name;
+ ed.dob = dob;
+ ed.mf = mf;
+ ed.country = country;
+ ed.state = state;
+ empDetails.put(id, ed);
+ }
+
+
+
+ private void assertEmpDetail(Tuple t, Integer id, String name, String dob, String mf)
+ throws ExecException {
+ assertNotNull(t);
+ assertEquals(4, t.size());
+ assertTrue(t.get(0).getClass() == Integer.class);
+ assertTrue(t.get(1).getClass() == String.class);
+ assertTrue(t.get(2).getClass() == String.class);
+ assertTrue(t.get(3).getClass() == String.class);
+
+ assertEquals(id, t.get(0));
+ assertEquals(name, t.get(1));
+ assertEquals(dob, t.get(2));
+ assertEquals(mf, t.get(3));
+ }
+
+ private void assertEmpDetail(Tuple t, String mf, String name)
+ throws ExecException {
+ assertNotNull(t);
+ assertEquals(2, t.size());
+ assertTrue(t.get(0).getClass() == String.class);
+ assertTrue(t.get(1).getClass() == String.class);
+
+ assertEquals(mf, t.get(0));
+ assertEquals(name, t.get(1));
+ }
+
+
+
+ public void testLoadNonPartTable() throws Exception {
+ populateDataFile();
+ {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server
+ .registerQuery("A = load '"
+ + fqdataLocation
+ + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
+ server.registerQuery("store A into '" + NONPART_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');");
+ server.executeBatch();
+ }
+ {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+
+ server
+ .registerQuery("A = load '"
+ + fqexportLocation
+ + "' using org.apache.hcatalog.pig.HCatEximLoader();");
+ Iterator<Tuple> XIter = server.openIterator("A");
+ assertTrue(XIter.hasNext());
+ Tuple t = XIter.next();
+ assertEmpDetail(t, 237, "Krishna", "01/01/1990", "M");
+ assertTrue(XIter.hasNext());
+ t = XIter.next();
+ assertEmpDetail(t, 238, "Kalpana", "01/01/2000", "F");
+ assertTrue(XIter.hasNext());
+ t = XIter.next();
+ assertEmpDetail(t, 239, "Satya", "01/01/2001", "M");
+ assertTrue(XIter.hasNext());
+ t = XIter.next();
+ assertEmpDetail(t, 240, "Kavya", "01/01/2002", "F");
+ assertFalse(XIter.hasNext());
+ }
+ }
+
+ public void testLoadNonPartProjection() throws Exception {
+ populateDataFile();
+ {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server
+ .registerQuery("A = load '"
+ + fqdataLocation
+ + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
+ server.registerQuery("store A into '" + NONPART_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');");
+ server.executeBatch();
+ }
+ {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+
+ server
+ .registerQuery("A = load '"
+ + fqexportLocation
+ + "' using org.apache.hcatalog.pig.HCatEximLoader();");
+ server.registerQuery("B = foreach A generate emp_sex, emp_name;");
+
+ Iterator<Tuple> XIter = server.openIterator("B");
+ assertTrue(XIter.hasNext());
+ Tuple t = XIter.next();
+ assertEmpDetail(t, "M", "Krishna");
+ assertTrue(XIter.hasNext());
+ t = XIter.next();
+ assertEmpDetail(t, "F", "Kalpana");
+ assertTrue(XIter.hasNext());
+ t = XIter.next();
+ assertEmpDetail(t, "M", "Satya");
+ assertTrue(XIter.hasNext());
+ t = XIter.next();
+ assertEmpDetail(t, "F", "Kavya");
+ assertFalse(XIter.hasNext());
+ }
+ }
+
+
+ public void testLoadMultiPartTable() throws Exception {
+ {
+ populateDataFile();
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server
+ .registerQuery("A = load '"
+ + fqdataLocation +
+ "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"
+ );
+ server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';");
+ server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';");
+ server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';");
+ server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';");
+ server.registerQuery("store INTN into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=in,emp_state=tn');");
+ server.registerQuery("store INKA into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=in,emp_state=ka');");
+ server.registerQuery("store USTN into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=us,emp_state=tn');");
+ server.registerQuery("store USKA into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=us,emp_state=ka');");
+ server.executeBatch();
+ }
+ {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+
+ server
+ .registerQuery("A = load '"
+ + fqexportLocation
+ + "' using org.apache.hcatalog.pig.HCatEximLoader() "
+ //+ "as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);");
+ + ";");
+
+ Iterator<Tuple> XIter = server.openIterator("A");
+
+ Map<Integer, EmpDetail> empDetails = new TreeMap<Integer, EmpDetail>();
+ addEmpDetail(empDetails, 237, "Krishna", "01/01/1990", "M", "in", "tn");
+ addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka");
+ addEmpDetail(empDetails, 239, "Satya", "01/01/2001", "M", "us", "tn");
+ addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka");
+
+ while(XIter.hasNext()) {
+ Tuple t = XIter.next();
+ assertNotSame(0, empDetails.size());
+ assertEmpDetail(t, empDetails);
+ }
+ assertEquals(0, empDetails.size());
+ }
+ }
+
+ public void testLoadMultiPartFilter() throws Exception {
+ {
+ populateDataFile();
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server
+ .registerQuery("A = load '"
+ + fqdataLocation +
+ "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"
+ );
+ server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';");
+ server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';");
+ server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';");
+ server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';");
+ server.registerQuery("store INTN into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=in,emp_state=tn');");
+ server.registerQuery("store INKA into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=in,emp_state=ka');");
+ server.registerQuery("store USTN into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=us,emp_state=tn');");
+ server.registerQuery("store USKA into '" + PARTITIONED_TABLE
+ + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+ "', 'emp_country=us,emp_state=ka');");
+ server.executeBatch();
+ }
+ {
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+
+ server
+ .registerQuery("A = load '"
+ + fqexportLocation
+ + "' using org.apache.hcatalog.pig.HCatEximLoader() "
+ + ";");
+ server.registerQuery("B = filter A by emp_state == 'ka';");
+
+ Iterator<Tuple> XIter = server.openIterator("B");
+
+ Map<Integer, EmpDetail> empDetails = new TreeMap<Integer, EmpDetail>();
+ addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka");
+ addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka");
+
+ while(XIter.hasNext()) {
+ Tuple t = XIter.next();
+ assertNotSame(0, empDetails.size());
+ assertEmpDetail(t, empDetails);
+ }
+ assertEquals(0, empDetails.size());
+ }
+ }
+
+
+}
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.data.Pair;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+public class TestHCatLoader extends TestCase {
+ private static final String TEST_DATA_DIR = System.getProperty("user.dir") +
+ "/build/test/data/" + TestHCatLoader.class.getCanonicalName();
+ private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+ private static final String BASIC_FILE_NAME = TEST_DATA_DIR + "/basic.input.data";
+ private static final String COMPLEX_FILE_NAME = TEST_DATA_DIR + "/complex.input.data";
+
+ private static final String BASIC_TABLE = "junit_unparted_basic";
+ private static final String COMPLEX_TABLE = "junit_unparted_complex";
+ private static final String PARTITIONED_TABLE = "junit_parted_basic";
+ private static final String SPECIFIC_SIZE_TABLE = "junit_specific_size";
+ private static Driver driver;
+
+ private static int guardTestCount = 6; // ugh, instantiate using introspection in guardedSetupBeforeClass
+ private static boolean setupHasRun = false;
+
+
+ private static Map<Integer, Pair<Integer, String>> basicInputData;
+
+ protected String storageFormat() {
+ return "RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
+ "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver')";
+ }
+
+ private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
+ driver.run("drop table " + tablename);
+ }
+
+ private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
+ String createTable;
+ createTable = "create table " + tablename + "(" + schema + ") ";
+ if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
+ createTable = createTable + "partitioned by (" + partitionedBy + ") ";
+ }
+ createTable = createTable + "stored as " +storageFormat();
+ int retCode = driver.run(createTable).getResponseCode();
+ if (retCode != 0) {
+ throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + "]");
+ }
+ }
+
+ private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
+ createTable(tablename, schema, null);
+ }
+
+ protected void guardedSetUpBeforeClass() throws Exception {
+ if (!setupHasRun) {
+ setupHasRun = true;
+ } else {
+ return;
+ }
+
+ File f = new File(TEST_WAREHOUSE_DIR);
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ new File(TEST_WAREHOUSE_DIR).mkdirs();
+
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
+ driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+
+ cleanup();
+
+ createTable(BASIC_TABLE, "a int, b string");
+ createTable(COMPLEX_TABLE,
+ "name string, studentid int, "
+ + "contact struct<phno:string,email:string>, "
+ + "currently_registered_courses array<string>, "
+ + "current_grades map<string,string>, "
+ + "phnos array<struct<phno:string,type:string>>");
+
+ createTable(PARTITIONED_TABLE, "a int, b string", "bkt string");
+ createTable(SPECIFIC_SIZE_TABLE, "a int, b string");
+
+ int LOOP_SIZE = 3;
+ String[] input = new String[LOOP_SIZE * LOOP_SIZE];
+ basicInputData = new HashMap<Integer, Pair<Integer, String>>();
+ int k = 0;
+ for (int i = 1; i <= LOOP_SIZE; i++) {
+ String si = i + "";
+ for (int j = 1; j <= LOOP_SIZE; j++) {
+ String sj = "S" + j + "S";
+ input[k] = si + "\t" + sj;
+ basicInputData.put(k, new Pair<Integer, String>(i, sj));
+ k++;
+ }
+ }
+ HcatTestUtils.createTestDataFile(BASIC_FILE_NAME, input);
+ HcatTestUtils.createTestDataFile(COMPLEX_FILE_NAME,
+ new String[]{
+ //"Henry Jekyll\t42\t(415-253-6367,hjekyll@contemporary.edu.uk)\t{(PHARMACOLOGY),(PSYCHIATRY)},[PHARMACOLOGY#A-,PSYCHIATRY#B+],{(415-253-6367,cell),(408-253-6367,landline)}",
+ //"Edward Hyde\t1337\t(415-253-6367,anonymous@b44chan.org)\t{(CREATIVE_WRITING),(COPYRIGHT_LAW)},[CREATIVE_WRITING#A+,COPYRIGHT_LAW#D],{(415-253-6367,cell),(408-253-6367,landline)}",
+ }
+ );
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);");
+
+ server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");
+ server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");
+ server.registerQuery("B = foreach A generate a,b;");
+ server.registerQuery("B2 = filter B by a < 2;");
+ server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
+
+ server.registerQuery("C = foreach A generate a,b;");
+ server.registerQuery("C2 = filter C by a >= 2;");
+ server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatStorer('bkt=1');");
+
+ server.registerQuery("D = load '" + COMPLEX_FILE_NAME + "' as (name:chararray, studentid:int, contact:tuple(phno:chararray,email:chararray), currently_registered_courses:bag{innertup:tuple(course:chararray)}, current_grades:map[ ] , phnos :bag{innertup:tuple(phno:chararray,type:chararray)});");
+ server.registerQuery("store D into '" + COMPLEX_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");
+ server.executeBatch();
+
+ }
+
+ private void cleanup() throws IOException, CommandNeedRetryException {
+ dropTable(BASIC_TABLE);
+ dropTable(COMPLEX_TABLE);
+ dropTable(PARTITIONED_TABLE);
+ dropTable(SPECIFIC_SIZE_TABLE);
+ }
+
+ protected void guardedTearDownAfterClass() throws Exception {
+ guardTestCount--;
+ if (guardTestCount > 0) {
+ return;
+ }
+ cleanup();
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ guardedSetUpBeforeClass();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ guardedTearDownAfterClass();
+ }
+
+ public void testSchemaLoadBasic() throws IOException {
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+
+ // test that schema was loaded correctly
+ server.registerQuery("X = load '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+ Schema dumpedXSchema = server.dumpSchema("X");
+ List<FieldSchema> Xfields = dumpedXSchema.getFields();
+ assertEquals(2, Xfields.size());
+ assertTrue(Xfields.get(0).alias.equalsIgnoreCase("a"));
+ assertTrue(Xfields.get(0).type == DataType.INTEGER);
+ assertTrue(Xfields.get(1).alias.equalsIgnoreCase("b"));
+ assertTrue(Xfields.get(1).type == DataType.CHARARRAY);
+
+ }
+
+ public void testReadDataBasic() throws IOException {
+ PigServer server = new PigServer(ExecType.LOCAL);
+
+ server.registerQuery("X = load '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+ Iterator<Tuple> XIter = server.openIterator("X");
+ int numTuplesRead = 0;
+ while (XIter.hasNext()) {
+ Tuple t = XIter.next();
+ assertEquals(2, t.size());
+ assertTrue(t.get(0).getClass() == Integer.class);
+ assertTrue(t.get(1).getClass() == String.class);
+ assertEquals(t.get(0), basicInputData.get(numTuplesRead).first);
+ assertEquals(t.get(1), basicInputData.get(numTuplesRead).second);
+ numTuplesRead++;
+ }
+ assertEquals(basicInputData.size(), numTuplesRead);
+ }
+
+ public void testSchemaLoadComplex() throws IOException {
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+
+ // test that schema was loaded correctly
+ server.registerQuery("K = load '" + COMPLEX_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+ Schema dumpedKSchema = server.dumpSchema("K");
+ List<FieldSchema> Kfields = dumpedKSchema.getFields();
+ assertEquals(6, Kfields.size());
+
+ assertEquals(DataType.CHARARRAY, Kfields.get(0).type);
+ assertEquals("name", Kfields.get(0).alias.toLowerCase());
+
+ assertEquals(DataType.INTEGER, Kfields.get(1).type);
+ assertEquals("studentid", Kfields.get(1).alias.toLowerCase());
+
+ assertEquals(DataType.TUPLE, Kfields.get(2).type);
+ assertEquals("contact", Kfields.get(2).alias.toLowerCase());
+ {
+ assertNotNull(Kfields.get(2).schema);
+ assertTrue(Kfields.get(2).schema.getFields().size() == 2);
+ assertTrue(Kfields.get(2).schema.getFields().get(0).type == DataType.CHARARRAY);
+ assertTrue(Kfields.get(2).schema.getFields().get(0).alias.equalsIgnoreCase("phno"));
+ assertTrue(Kfields.get(2).schema.getFields().get(1).type == DataType.CHARARRAY);
+ assertTrue(Kfields.get(2).schema.getFields().get(1).alias.equalsIgnoreCase("email"));
+ }
+ assertEquals(DataType.BAG, Kfields.get(3).type);
+ assertEquals("currently_registered_courses", Kfields.get(3).alias.toLowerCase());
+ {
+ assertNotNull(Kfields.get(3).schema);
+ assertEquals(1, Kfields.get(3).schema.getFields().size());
+ assertEquals(DataType.TUPLE, Kfields.get(3).schema.getFields().get(0).type);
+ assertNotNull(Kfields.get(3).schema.getFields().get(0).schema);
+ assertEquals(1, Kfields.get(3).schema.getFields().get(0).schema.getFields().size());
+ assertEquals(DataType.CHARARRAY, Kfields.get(3).schema.getFields().get(0).schema.getFields().get(0).type);
+ // assertEquals("course",Kfields.get(3).schema.getFields().get(0).schema.getFields().get(0).alias.toLowerCase());
+ // commented out, because the name becomes "innerfield" by default - we call it "course" in pig,
+ // but in the metadata, it'd be anonymous, so this would be autogenerated, which is fine
+ }
+ assertEquals(DataType.MAP, Kfields.get(4).type);
+ assertEquals("current_grades", Kfields.get(4).alias.toLowerCase());
+ assertEquals(DataType.BAG, Kfields.get(5).type);
+ assertEquals("phnos", Kfields.get(5).alias.toLowerCase());
+ {
+ assertNotNull(Kfields.get(5).schema);
+ assertEquals(1, Kfields.get(5).schema.getFields().size());
+ assertEquals(DataType.TUPLE, Kfields.get(5).schema.getFields().get(0).type);
+ assertNotNull(Kfields.get(5).schema.getFields().get(0).schema);
+ assertTrue(Kfields.get(5).schema.getFields().get(0).schema.getFields().size() == 2);
+ assertEquals(DataType.CHARARRAY, Kfields.get(5).schema.getFields().get(0).schema.getFields().get(0).type);
+ assertEquals("phno", Kfields.get(5).schema.getFields().get(0).schema.getFields().get(0).alias.toLowerCase());
+ assertEquals(DataType.CHARARRAY, Kfields.get(5).schema.getFields().get(0).schema.getFields().get(1).type);
+ assertEquals("type", Kfields.get(5).schema.getFields().get(0).schema.getFields().get(1).alias.toLowerCase());
+ }
+
+ }
+
+ public void testReadPartitionedBasic() throws IOException, CommandNeedRetryException {
+ PigServer server = new PigServer(ExecType.LOCAL);
+
+ driver.run("select * from " + PARTITIONED_TABLE);
+ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ assertEquals(basicInputData.size(), valuesReadFromHiveDriver.size());
+
+ server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+ Schema dumpedWSchema = server.dumpSchema("W");
+ List<FieldSchema> Wfields = dumpedWSchema.getFields();
+ assertEquals(3, Wfields.size());
+ assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
+ assertTrue(Wfields.get(0).type == DataType.INTEGER);
+ assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
+ assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
+ assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
+ assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
+
+ Iterator<Tuple> WIter = server.openIterator("W");
+ Collection<Pair<Integer, String>> valuesRead = new ArrayList<Pair<Integer, String>>();
+ while (WIter.hasNext()) {
+ Tuple t = WIter.next();
+ assertTrue(t.size() == 3);
+ assertTrue(t.get(0).getClass() == Integer.class);
+ assertTrue(t.get(1).getClass() == String.class);
+ assertTrue(t.get(2).getClass() == String.class);
+ valuesRead.add(new Pair<Integer, String>((Integer) t.get(0), (String) t.get(1)));
+ if ((Integer) t.get(0) < 2) {
+ assertEquals("0", t.get(2));
+ } else {
+ assertEquals("1", t.get(2));
+ }
+ }
+ assertEquals(valuesReadFromHiveDriver.size(), valuesRead.size());
+
+ server.registerQuery("P1 = load '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+ server.registerQuery("P1filter = filter P1 by bkt == '0';");
+ Iterator<Tuple> P1Iter = server.openIterator("P1filter");
+ int count1 = 0;
+ while (P1Iter.hasNext()) {
+ Tuple t = P1Iter.next();
+
+ assertEquals("0", t.get(2));
+ assertEquals(1, t.get(0));
+ count1++;
+ }
+ assertEquals(3, count1);
+
+ server.registerQuery("P2 = load '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+ server.registerQuery("P2filter = filter P2 by bkt == '1';");
+ Iterator<Tuple> P2Iter = server.openIterator("P2filter");
+ int count2 = 0;
+ while (P2Iter.hasNext()) {
+ Tuple t = P2Iter.next();
+
+ assertEquals("1", t.get(2));
+ assertTrue(((Integer) t.get(0)) > 1);
+ count2++;
+ }
+ assertEquals(6, count2);
+ }
+
+ public void testProjectionsBasic() throws IOException {
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+
+ // projections are handled by using generate, not "as" on the Load
+
+ server.registerQuery("Y1 = load '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+ server.registerQuery("Y2 = foreach Y1 generate a;");
+ server.registerQuery("Y3 = foreach Y1 generate b,a;");
+ Schema dumpedY2Schema = server.dumpSchema("Y2");
+ Schema dumpedY3Schema = server.dumpSchema("Y3");
+ List<FieldSchema> Y2fields = dumpedY2Schema.getFields();
+ List<FieldSchema> Y3fields = dumpedY3Schema.getFields();
+ assertEquals(1, Y2fields.size());
+ assertEquals("a", Y2fields.get(0).alias.toLowerCase());
+ assertEquals(DataType.INTEGER, Y2fields.get(0).type);
+ assertEquals(2, Y3fields.size());
+ assertEquals("b", Y3fields.get(0).alias.toLowerCase());
+ assertEquals(DataType.CHARARRAY, Y3fields.get(0).type);
+ assertEquals("a", Y3fields.get(1).alias.toLowerCase());
+ assertEquals(DataType.INTEGER, Y3fields.get(1).type);
+
+ int numTuplesRead = 0;
+ Iterator<Tuple> Y2Iter = server.openIterator("Y2");
+ while (Y2Iter.hasNext()) {
+ Tuple t = Y2Iter.next();
+ assertEquals(t.size(), 1);
+ assertTrue(t.get(0).getClass() == Integer.class);
+ assertEquals(t.get(0), basicInputData.get(numTuplesRead).first);
+ numTuplesRead++;
+ }
+ numTuplesRead = 0;
+ Iterator<Tuple> Y3Iter = server.openIterator("Y3");
+ while (Y3Iter.hasNext()) {
+ Tuple t = Y3Iter.next();
+ assertEquals(t.size(), 2);
+ assertTrue(t.get(0).getClass() == String.class);
+ assertEquals(t.get(0), basicInputData.get(numTuplesRead).second);
+ assertTrue(t.get(1).getClass() == Integer.class);
+ assertEquals(t.get(1), basicInputData.get(numTuplesRead).first);
+ numTuplesRead++;
+ }
+ assertEquals(basicInputData.size(), numTuplesRead);
+ }
+
+ public void testGetInputBytes() throws Exception {
+ File file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE + "/part-m-00000");
+ file.deleteOnExit();
+ RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+ randomAccessFile.setLength(2L * 1024 * 1024 * 1024);
+
+ Job job = new Job();
+ HCatLoader hCatLoader = new HCatLoader();
+ hCatLoader.setUDFContextSignature(this.getName());
+ hCatLoader.setLocation(SPECIFIC_SIZE_TABLE, job);
+ ResourceStatistics statistics = hCatLoader.getStatistics(file.getAbsolutePath(), job);
+ assertEquals(2048, (long) statistics.getmBytes());
+ }
+
+ public void testConvertBooleanToInt() throws Exception {
+ String tbl = "test_convert_boolean_to_int";
+ String inputFileName = TEST_DATA_DIR + "/testConvertBooleanToInt/data.txt";
+ File inputDataDir = new File(inputFileName).getParentFile();
+ inputDataDir.mkdir();
+
+ String[] lines = new String[]{"llama\t1", "alpaca\t0"};
+ HcatTestUtils.createTestDataFile(inputFileName, lines);
+
+ assertEquals(0, driver.run("drop table if exists " + tbl).getResponseCode());
+ assertEquals(0, driver.run("create external table " + tbl +
+ " (a string, b boolean) row format delimited fields terminated by '\t'" +
+ " stored as textfile location 'file://" +
+ inputDataDir.getAbsolutePath() + "'").getResponseCode());
+
+ Properties properties = new Properties();
+ properties.setProperty(HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, "true");
+ PigServer server = new PigServer(ExecType.LOCAL, properties);
+ server.registerQuery(
+ "data = load 'test_convert_boolean_to_int' using org.apache.hcatalog.pig.HCatLoader();");
+ Schema schema = server.dumpSchema("data");
+ assertEquals(2, schema.getFields().size());
+
+ assertEquals("a", schema.getField(0).alias);
+ assertEquals(DataType.CHARARRAY, schema.getField(0).type);
+ assertEquals("b", schema.getField(1).alias);
+ assertEquals(DataType.INTEGER, schema.getField(1).type);
+
+ Iterator<Tuple> iterator = server.openIterator("data");
+ Tuple t = iterator.next();
+ assertEquals("llama", t.get(0));
+ // TODO: Figure out how to load a text file into Hive with boolean columns. This next assert
+ // passes because data was loaded as integers, not because it was converted.
+ assertEquals(1, t.get(1));
+ t = iterator.next();
+ assertEquals("alpaca", t.get(0));
+ assertEquals(0, t.get(1));
+ assertFalse(iterator.hasNext());
+ }
+}
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHCatLoaderComplexSchema {
+
+ //private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static Driver driver;
+ //private static Properties props;
+ private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoaderComplexSchema.class);
+
+ private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
+ driver.run("drop table " + tablename);
+ }
+
+ private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
+ String createTable;
+ createTable = "create table " + tablename + "(" + schema + ") ";
+ if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
+ createTable = createTable + "partitioned by (" + partitionedBy + ") ";
+ }
+ createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
+ "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+ LOG.info("Creating table:\n {}", createTable);
+ CommandProcessorResponse result = driver.run(createTable);
+ int retCode = result.getResponseCode();
+ if (retCode != 0) {
+ throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + " " + result.getErrorMessage() + "]");
+ }
+ }
+
+ private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
+ createTable(tablename, schema, null);
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ HiveConf hiveConf = new HiveConf(TestHCatLoaderComplexSchema.class);
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ //props = new Properties();
+ //props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+
+ }
+
+ private static final TupleFactory tf = TupleFactory.getInstance();
+ private static final BagFactory bf = BagFactory.getInstance();
+
+ private Tuple t(Object... objects) {
+ return tf.newTuple(Arrays.asList(objects));
+ }
+
+ private DataBag b(Tuple... objects) {
+ return bf.newDefaultBag(Arrays.asList(objects));
+ }
+
+ /**
+ * artificially complex nested schema to test nested schema conversion
+ * @throws Exception
+ */
+ @Test
+ public void testSyntheticComplexSchema() throws Exception {
+ String pigSchema =
+ "a: " +
+ "(" +
+ "aa: chararray, " +
+ "ab: long, " +
+ "ac: map[], " +
+ "ad: { t: (ada: long) }, " +
+ "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," +
+ "af: (afa: chararray, afb: long) " +
+ ")," +
+ "b: chararray, " +
+ "c: long, " +
+ "d: { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } ";
+
+ // with extra structs
+ String tableSchema =
+ "a struct<" +
+ "aa: string, " +
+ "ab: bigint, " +
+ "ac: map<string, string>, " +
+ "ad: array<struct<ada:bigint>>, " +
+ "ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," +
+ "af: struct<afa: string, afb: bigint> " +
+ ">, " +
+ "b string, " +
+ "c bigint, " +
+ "d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<struct<dca: bigint>>>>";
+
+ // without extra structs
+ String tableSchema2 =
+ "a struct<" +
+ "aa: string, " +
+ "ab: bigint, " +
+ "ac: map<string, string>, " +
+ "ad: array<bigint>, " +
+ "ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," +
+ "af: struct<afa: string, afb: bigint> " +
+ ">, " +
+ "b string, " +
+ "c bigint, " +
+ "d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<bigint>>>";
+
+ List<Tuple> data = new ArrayList<Tuple>();
+ for (int i = 0; i < 10; i++) {
+ Tuple t = t(
+ t(
+ "aa test",
+ 2l,
+ new HashMap<String, String>() {
+ {
+ put("ac test1", "test 1");
+ put("ac test2", "test 2");
+ }
+ },
+ b(t(3l), t(4l)),
+ b(t(5l, t("aeba test", 6l))),
+ t("afa test", 7l)
+ ),
+ "b test",
+ (long) i,
+ b(t(8l, t("dba test", 9l), b(t(10l)))));
+
+ data.add(t);
+ }
+ verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, true);
+ verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, false);
+ verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, true);
+ verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, false);
+
+ }
+
+ private void verifyWriteRead(String tablename, String pigSchema, String tableSchema, List<Tuple> data, boolean provideSchemaToStorer)
+ throws IOException, CommandNeedRetryException, ExecException, FrontendException {
+ MockLoader.setData(tablename + "Input", data);
+ try {
+ createTable(tablename, tableSchema);
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerQuery("A = load '" + tablename + "Input' using org.apache.hcatalog.pig.MockLoader() AS (" + pigSchema + ");");
+ Schema dumpedASchema = server.dumpSchema("A");
+ server.registerQuery("STORE A into '" + tablename + "' using org.apache.hcatalog.pig.HCatStorer("
+ + (provideSchemaToStorer ? "'', '" + pigSchema + "'" : "")
+ + ");");
+
+ ExecJob execJob = server.executeBatch().get(0);
+ if (!execJob.getStatistics().isSuccessful()) {
+ throw new RuntimeException("Import failed", execJob.getException());
+ }
+ // test that schema was loaded correctly
+ server.registerQuery("X = load '" + tablename + "' using org.apache.hcatalog.pig.HCatLoader();");
+ server.dumpSchema("X");
+ Iterator<Tuple> it = server.openIterator("X");
+ int i = 0;
+ while (it.hasNext()) {
+ Tuple input = data.get(i++);
+ Tuple output = it.next();
+ Assert.assertEquals(input.toString(), output.toString());
+ LOG.info("tuple : {} ", output);
+ }
+ Schema dumpedXSchema = server.dumpSchema("X");
+
+ Assert.assertEquals(
+ "expected " + dumpedASchema + " but was " + dumpedXSchema + " (ignoring field names)",
+ "",
+ compareIgnoreFiledNames(dumpedASchema, dumpedXSchema));
+
+ } finally {
+ dropTable(tablename);
+ }
+ }
+
+ private String compareIgnoreFiledNames(Schema expected, Schema got) throws FrontendException {
+ if (expected == null || got == null) {
+ if (expected == got) {
+ return "";
+ } else {
+ return "\nexpected " + expected + " got " + got;
+ }
+ }
+ if (expected.size() != got.size()) {
+ return "\nsize expected " + expected.size() + " (" + expected + ") got " + got.size() + " (" + got + ")";
+ }
+ String message = "";
+ for (int i = 0; i < expected.size(); i++) {
+ FieldSchema expectedField = expected.getField(i);
+ FieldSchema gotField = got.getField(i);
+ if (expectedField.type != gotField.type) {
+ message += "\ntype expected " + expectedField.type + " (" + expectedField + ") got " + gotField.type + " (" + gotField + ")";
+ } else {
+ message += compareIgnoreFiledNames(expectedField.schema, gotField.schema);
+ }
+ }
+ return message;
+ }
+
+ /**
+ * tests that unnecessary tuples are drop while converting schema
+ * (Pig requires Tuples in Bags)
+ * @throws Exception
+ */
+ @Test
+ public void testTupleInBagInTupleInBag() throws Exception {
+ String pigSchema = "a: { b : ( c: { d: (i : long) } ) }";
+
+ String tableSchema = "a array< array< bigint > >";
+
+ List<Tuple> data = new ArrayList<Tuple>();
+ data.add(t(b(t(b(t(100l), t(101l))), t(b(t(110l))))));
+ data.add(t(b(t(b(t(200l))), t(b(t(210l))), t(b(t(220l))))));
+ data.add(t(b(t(b(t(300l), t(301l))))));
+ data.add(t(b(t(b(t(400l))), t(b(t(410l), t(411l), t(412l))))));
+
+
+ verifyWriteRead("TupleInBagInTupleInBag1", pigSchema, tableSchema, data, true);
+ verifyWriteRead("TupleInBagInTupleInBag2", pigSchema, tableSchema, data, false);
+
+ // test that we don't drop the unnecessary tuple if the table has the corresponding Struct
+ String tableSchema2 = "a array< struct< c: array< struct< i: bigint > > > >";
+
+ verifyWriteRead("TupleInBagInTupleInBag3", pigSchema, tableSchema2, data, true);
+ verifyWriteRead("TupleInBagInTupleInBag4", pigSchema, tableSchema2, data, false);
+
+ }
+
+ @Test
+ public void testMapWithComplexData() throws Exception {
+ String pigSchema = "a: long, b: map[]";
+ String tableSchema = "a bigint, b map<string, struct<aa:bigint, ab:string>>";
+
+ List<Tuple> data = new ArrayList<Tuple>();
+ for (int i = 0; i < 10; i++) {
+ Tuple t = t(
+ (long) i,
+ new HashMap<String, Object>() {
+ {
+ put("b test 1", t(1l, "test 1"));
+ put("b test 2", t(2l, "test 2"));
+ }
+ });
+
+ data.add(t);
+ }
+ verifyWriteRead("testMapWithComplexData", pigSchema, tableSchema, data, true);
+ verifyWriteRead("testMapWithComplexData2", pigSchema, tableSchema, data, false);
+
+ }
+}
Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.mapreduce.HCatBaseTest;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Test that require both HCatLoader and HCatStorer. For read or write only functionality,
+ * please consider @{link TestHCatLoader} or @{link TestHCatStorer}.
+ */
+public class TestHCatLoaderStorer extends HCatBaseTest {
+
+ /**
+ * Ensure Pig can read/write tinyint/smallint columns.
+ */
+ @Test
+ public void testSmallTinyInt() throws Exception {
+
+ String readTblName = "test_small_tiny_int";
+ File dataDir = new File(TEST_DATA_DIR + "/testSmallTinyIntData");
+ File dataFile = new File(dataDir, "testSmallTinyInt.tsv");
+
+ String writeTblName = "test_small_tiny_int_write";
+ File writeDataFile = new File(TEST_DATA_DIR, writeTblName + ".tsv");
+
+ FileUtil.fullyDelete(dataDir); // Might not exist
+ Assert.assertTrue(dataDir.mkdir());
+
+ HcatTestUtils.createTestDataFile(dataFile.getAbsolutePath(), new String[]{
+ String.format("%d\t%d", Short.MIN_VALUE, Byte.MIN_VALUE),
+ String.format("%d\t%d", Short.MAX_VALUE, Byte.MAX_VALUE)
+ });
+
+ // Create a table with smallint/tinyint columns, load data, and query from Hive.
+ Assert.assertEquals(0, driver.run("drop table if exists " + readTblName).getResponseCode());
+ Assert.assertEquals(0, driver.run("create external table " + readTblName +
+ " (my_small_int smallint, my_tiny_int tinyint)" +
+ " row format delimited fields terminated by '\t' stored as textfile").getResponseCode());
+ Assert.assertEquals(0, driver.run("load data local inpath '" +
+ dataDir.getAbsolutePath() + "' into table " + readTblName).getResponseCode());
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.registerQuery(
+ "data = load '" + readTblName + "' using org.apache.hcatalog.pig.HCatLoader();");
+
+ // Ensure Pig schema is correct.
+ Schema schema = server.dumpSchema("data");
+ Assert.assertEquals(2, schema.getFields().size());
+ Assert.assertEquals("my_small_int", schema.getField(0).alias);
+ Assert.assertEquals(DataType.INTEGER, schema.getField(0).type);
+ Assert.assertEquals("my_tiny_int", schema.getField(1).alias);
+ Assert.assertEquals(DataType.INTEGER, schema.getField(1).type);
+
+ // Ensure Pig can read data correctly.
+ Iterator<Tuple> it = server.openIterator("data");
+ Tuple t = it.next();
+ Assert.assertEquals(new Integer(Short.MIN_VALUE), t.get(0));
+ Assert.assertEquals(new Integer(Byte.MIN_VALUE), t.get(1));
+ t = it.next();
+ Assert.assertEquals(new Integer(Short.MAX_VALUE), t.get(0));
+ Assert.assertEquals(new Integer(Byte.MAX_VALUE), t.get(1));
+ Assert.assertFalse(it.hasNext());
+
+ // Ensure Pig can write correctly to smallint/tinyint columns. This means values within the
+ // bounds of the column type are written, and values outside throw an exception.
+ Assert.assertEquals(0, driver.run("drop table if exists " + writeTblName).getResponseCode());
+ Assert.assertEquals(0, driver.run("create table " + writeTblName +
+ " (my_small_int smallint, my_tiny_int tinyint) stored as rcfile").getResponseCode());
+
+ // Values within the column type bounds.
+ HcatTestUtils.createTestDataFile(writeDataFile.getAbsolutePath(), new String[]{
+ String.format("%d\t%d", Short.MIN_VALUE, Byte.MIN_VALUE),
+ String.format("%d\t%d", Short.MAX_VALUE, Byte.MAX_VALUE)
+ });
+ smallTinyIntBoundsCheckHelper(writeDataFile.getAbsolutePath(), ExecJob.JOB_STATUS.COMPLETED);
+
+ // Values outside the column type bounds will fail at runtime.
+ HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/shortTooSmall.tsv", new String[]{
+ String.format("%d\t%d", Short.MIN_VALUE - 1, 0)});
+ smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/shortTooSmall.tsv", ExecJob.JOB_STATUS.FAILED);
+
+ HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/shortTooBig.tsv", new String[]{
+ String.format("%d\t%d", Short.MAX_VALUE + 1, 0)});
+ smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/shortTooBig.tsv", ExecJob.JOB_STATUS.FAILED);
+
+ HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/byteTooSmall.tsv", new String[]{
+ String.format("%d\t%d", 0, Byte.MIN_VALUE - 1)});
+ smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/byteTooSmall.tsv", ExecJob.JOB_STATUS.FAILED);
+
+ HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/byteTooBig.tsv", new String[]{
+ String.format("%d\t%d", 0, Byte.MAX_VALUE + 1)});
+ smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/byteTooBig.tsv", ExecJob.JOB_STATUS.FAILED);
+ }
+
+ private void smallTinyIntBoundsCheckHelper(String data, ExecJob.JOB_STATUS expectedStatus)
+ throws Exception {
+ Assert.assertEquals(0, driver.run("drop table if exists test_tbl").getResponseCode());
+ Assert.assertEquals(0, driver.run("create table test_tbl" +
+ " (my_small_int smallint, my_tiny_int tinyint) stored as rcfile").getResponseCode());
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerQuery("data = load '" + data +
+ "' using PigStorage('\t') as (my_small_int:int, my_tiny_int:int);");
+ server.registerQuery(
+ "store data into 'test_tbl' using org.apache.hcatalog.pig.HCatStorer();");
+ List<ExecJob> jobs = server.executeBatch();
+ Assert.assertEquals(expectedStatus, jobs.get(0).getStatus());
+ }
+}