You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC

svn commit: r1520466 [14/18] - in /hive/trunk/hcatalog: core/src/main/java/org/apache/hcatalog/cli/ core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/ core/src/main/java/org/apache/hcatalog/common/ core/src/main/java/org/apache/hcatalog/data/...

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+
+public class MockLoader extends LoadFunc {
+    private static final class MockRecordReader extends RecordReader<Object, Object> {
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public Object getCurrentKey() throws IOException, InterruptedException {
+            return "mockKey";
+        }
+
+        @Override
+        public Object getCurrentValue() throws IOException, InterruptedException {
+            return "mockValue";
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+            return 0.5f;
+        }
+
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException,
+            InterruptedException {
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            return true;
+        }
+    }
+
+    private static final class MockInputSplit extends InputSplit implements Writable {
+        private String location;
+
+        public MockInputSplit() {
+        }
+
+        public MockInputSplit(String location) {
+            this.location = location;
+        }
+
+        @Override
+        public String[] getLocations() throws IOException, InterruptedException {
+            return new String[]{location};
+        }
+
+        @Override
+        public long getLength() throws IOException, InterruptedException {
+            return 10000000;
+        }
+
+        @Override
+        public boolean equals(Object arg0) {
+            return arg0 == this;
+        }
+
+        @Override
+        public int hashCode() {
+            return location.hashCode();
+        }
+
+        @Override
+        public void readFields(DataInput arg0) throws IOException {
+            location = arg0.readUTF();
+        }
+
+        @Override
+        public void write(DataOutput arg0) throws IOException {
+            arg0.writeUTF(location);
+        }
+    }
+
+    private static final class MockInputFormat extends InputFormat {
+
+        private final String location;
+
+        public MockInputFormat(String location) {
+            this.location = location;
+        }
+
+        @Override
+        public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
+            throws IOException, InterruptedException {
+            return new MockRecordReader();
+        }
+
+        @Override
+        public List getSplits(JobContext arg0) throws IOException, InterruptedException {
+            return Arrays.asList(new MockInputSplit(location));
+        }
+    }
+
+    private static final Map<String, Iterable<Tuple>> locationToData = new HashMap<String, Iterable<Tuple>>();
+
+    public static void setData(String location, Iterable<Tuple> data) {
+        locationToData.put(location, data);
+    }
+
+    private String location;
+
+    private Iterator<Tuple> data;
+
+    @Override
+    public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+        return location;
+    }
+
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+        this.location = location;
+        if (location == null) {
+            throw new IOException("null location passed to MockLoader");
+        }
+        this.data = locationToData.get(location).iterator();
+        if (this.data == null) {
+            throw new IOException("No data configured for location: " + location);
+        }
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+        if (data == null) {
+            throw new IOException("data was not correctly initialized in MockLoader");
+        }
+        return data.hasNext() ? data.next() : null;
+    }
+
+    @Override
+    public InputFormat getInputFormat() throws IOException {
+        return new MockInputFormat(location);
+    }
+
+    @Override
+    public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException {
+    }
+
+}

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+
+public class MyPigStorage extends PigStorage {
+
+    String arg2;
+
+    public MyPigStorage(String arg1, String arg2) throws IOException {
+        super(arg1);
+        this.arg2 = arg2;
+    }
+
+    @Override
+    public void putNext(Tuple t) throws IOException {
+        t.append(arg2);
+        super.putNext(t);
+    }
+}

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatEximLoader.java.broken
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatEximLoader.java.broken?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatEximLoader.java.broken (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatEximLoader.java.broken Fri Sep  6 00:49:14 2013
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.hcatalog.MiniCluster;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ *
+ * TestHCatEximLoader. Assumes Exim storer is working well
+ *
+ */
+public class TestHCatEximLoader extends TestCase {
+
+  private static final String NONPART_TABLE = "junit_unparted";
+  private static final String PARTITIONED_TABLE = "junit_parted";
+  private static MiniCluster cluster = MiniCluster.buildCluster();
+
+  private static final String dataLocation = "/tmp/data";
+  private static String fqdataLocation;
+  private static final String exportLocation = "/tmp/export";
+  private static String fqexportLocation;
+
+  private static Properties props;
+
+  private void cleanup() throws IOException {
+    MiniCluster.deleteFile(cluster, dataLocation);
+    MiniCluster.deleteFile(cluster, exportLocation);
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    props = new Properties();
+    props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+    System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName()
+        + ", fs.default.name : " + props.getProperty("fs.default.name"));
+    fqdataLocation = cluster.getProperties().getProperty("fs.default.name") + dataLocation;
+    fqexportLocation = cluster.getProperties().getProperty("fs.default.name") + exportLocation;
+    System.out.println("FQ Data Location :" + fqdataLocation);
+    System.out.println("FQ Export Location :" + fqexportLocation);
+    cleanup();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    cleanup();
+  }
+
+  private void populateDataFile() throws IOException {
+    MiniCluster.deleteFile(cluster, dataLocation);
+    String[] input = new String[] {
+        "237,Krishna,01/01/1990,M,IN,TN",
+        "238,Kalpana,01/01/2000,F,IN,KA",
+        "239,Satya,01/01/2001,M,US,TN",
+        "240,Kavya,01/01/2002,F,US,KA"
+    };
+    MiniCluster.createInputFile(cluster, dataLocation, input);
+  }
+
+  private static class EmpDetail {
+    String name;
+    String dob;
+    String mf;
+    String country;
+    String state;
+  }
+
+  private void assertEmpDetail(Tuple t, Map<Integer, EmpDetail> eds) throws ExecException {
+    assertNotNull(t);
+    assertEquals(6, t.size());
+
+    assertTrue(t.get(0).getClass() == Integer.class);
+    assertTrue(t.get(1).getClass() == String.class);
+    assertTrue(t.get(2).getClass() == String.class);
+    assertTrue(t.get(3).getClass() == String.class);
+    assertTrue(t.get(4).getClass() == String.class);
+    assertTrue(t.get(5).getClass() == String.class);
+
+    EmpDetail ed = eds.remove(t.get(0));
+    assertNotNull(ed);
+
+    assertEquals(ed.name, t.get(1));
+    assertEquals(ed.dob, t.get(2));
+    assertEquals(ed.mf, t.get(3));
+    assertEquals(ed.country, t.get(4));
+    assertEquals(ed.state, t.get(5));
+  }
+
+  private void addEmpDetail(Map<Integer, EmpDetail> empDetails, int id, String name,
+      String dob, String mf, String country, String state) {
+    EmpDetail ed = new EmpDetail();
+    ed.name = name;
+    ed.dob = dob;
+    ed.mf = mf;
+    ed.country = country;
+    ed.state = state;
+    empDetails.put(id, ed);
+  }
+
+
+
+  private void assertEmpDetail(Tuple t, Integer id, String name, String dob, String mf)
+      throws ExecException {
+    assertNotNull(t);
+    assertEquals(4, t.size());
+    assertTrue(t.get(0).getClass() == Integer.class);
+    assertTrue(t.get(1).getClass() == String.class);
+    assertTrue(t.get(2).getClass() == String.class);
+    assertTrue(t.get(3).getClass() == String.class);
+
+    assertEquals(id, t.get(0));
+    assertEquals(name, t.get(1));
+    assertEquals(dob, t.get(2));
+    assertEquals(mf, t.get(3));
+  }
+
+  private void assertEmpDetail(Tuple t, String mf, String name)
+      throws ExecException {
+    assertNotNull(t);
+    assertEquals(2, t.size());
+    assertTrue(t.get(0).getClass() == String.class);
+    assertTrue(t.get(1).getClass() == String.class);
+
+    assertEquals(mf, t.get(0));
+    assertEquals(name, t.get(1));
+  }
+
+
+
+  public void testLoadNonPartTable() throws Exception {
+    populateDataFile();
+    {
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+      server.setBatchOn();
+      server
+          .registerQuery("A = load '"
+              + fqdataLocation
+              + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
+      server.registerQuery("store A into '" + NONPART_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');");
+      server.executeBatch();
+    }
+    {
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+
+      server
+          .registerQuery("A = load '"
+              + fqexportLocation
+              + "' using org.apache.hcatalog.pig.HCatEximLoader();");
+      Iterator<Tuple> XIter = server.openIterator("A");
+      assertTrue(XIter.hasNext());
+      Tuple t = XIter.next();
+      assertEmpDetail(t, 237, "Krishna", "01/01/1990", "M");
+      assertTrue(XIter.hasNext());
+      t = XIter.next();
+      assertEmpDetail(t, 238, "Kalpana", "01/01/2000", "F");
+      assertTrue(XIter.hasNext());
+      t = XIter.next();
+      assertEmpDetail(t, 239, "Satya", "01/01/2001", "M");
+      assertTrue(XIter.hasNext());
+      t = XIter.next();
+      assertEmpDetail(t, 240, "Kavya", "01/01/2002", "F");
+      assertFalse(XIter.hasNext());
+    }
+  }
+
+  public void testLoadNonPartProjection() throws Exception {
+    populateDataFile();
+    {
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+      server.setBatchOn();
+      server
+          .registerQuery("A = load '"
+              + fqdataLocation
+              + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
+      server.registerQuery("store A into '" + NONPART_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');");
+      server.executeBatch();
+    }
+    {
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+
+      server
+          .registerQuery("A = load '"
+              + fqexportLocation
+              + "' using org.apache.hcatalog.pig.HCatEximLoader();");
+      server.registerQuery("B = foreach A generate emp_sex, emp_name;");
+
+      Iterator<Tuple> XIter = server.openIterator("B");
+      assertTrue(XIter.hasNext());
+      Tuple t = XIter.next();
+      assertEmpDetail(t, "M", "Krishna");
+      assertTrue(XIter.hasNext());
+      t = XIter.next();
+      assertEmpDetail(t, "F", "Kalpana");
+      assertTrue(XIter.hasNext());
+      t = XIter.next();
+      assertEmpDetail(t, "M", "Satya");
+      assertTrue(XIter.hasNext());
+      t = XIter.next();
+      assertEmpDetail(t, "F", "Kavya");
+      assertFalse(XIter.hasNext());
+    }
+  }
+
+
+  public void testLoadMultiPartTable() throws Exception {
+    {
+      populateDataFile();
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+      server.setBatchOn();
+      server
+          .registerQuery("A = load '"
+              + fqdataLocation +
+              "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"
+          );
+      server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';");
+      server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';");
+      server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';");
+      server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';");
+      server.registerQuery("store INTN into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=in,emp_state=tn');");
+      server.registerQuery("store INKA into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=in,emp_state=ka');");
+      server.registerQuery("store USTN into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=us,emp_state=tn');");
+      server.registerQuery("store USKA into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=us,emp_state=ka');");
+      server.executeBatch();
+    }
+    {
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+
+      server
+          .registerQuery("A = load '"
+              + fqexportLocation
+              + "' using org.apache.hcatalog.pig.HCatEximLoader() "
+              //+ "as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);");
+              + ";");
+
+      Iterator<Tuple> XIter = server.openIterator("A");
+
+      Map<Integer, EmpDetail> empDetails = new TreeMap<Integer, EmpDetail>();
+      addEmpDetail(empDetails, 237, "Krishna", "01/01/1990", "M", "in", "tn");
+      addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka");
+      addEmpDetail(empDetails, 239, "Satya", "01/01/2001", "M", "us", "tn");
+      addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka");
+
+      while(XIter.hasNext()) {
+        Tuple t = XIter.next();
+        assertNotSame(0, empDetails.size());
+        assertEmpDetail(t, empDetails);
+      }
+      assertEquals(0, empDetails.size());
+    }
+  }
+
+  public void testLoadMultiPartFilter() throws Exception {
+    {
+      populateDataFile();
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+      server.setBatchOn();
+      server
+          .registerQuery("A = load '"
+              + fqdataLocation +
+              "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"
+          );
+      server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';");
+      server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';");
+      server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';");
+      server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';");
+      server.registerQuery("store INTN into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=in,emp_state=tn');");
+      server.registerQuery("store INKA into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=in,emp_state=ka');");
+      server.registerQuery("store USTN into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=us,emp_state=tn');");
+      server.registerQuery("store USKA into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=us,emp_state=ka');");
+      server.executeBatch();
+    }
+    {
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+
+      server
+          .registerQuery("A = load '"
+              + fqexportLocation
+              + "' using org.apache.hcatalog.pig.HCatEximLoader() "
+              + ";");
+      server.registerQuery("B = filter A by emp_state == 'ka';");
+
+      Iterator<Tuple> XIter = server.openIterator("B");
+
+      Map<Integer, EmpDetail> empDetails = new TreeMap<Integer, EmpDetail>();
+      addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka");
+      addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka");
+
+      while(XIter.hasNext()) {
+        Tuple t = XIter.next();
+        assertNotSame(0, empDetails.size());
+        assertEmpDetail(t, empDetails);
+      }
+      assertEquals(0, empDetails.size());
+    }
+  }
+
+
+}

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.data.Pair;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+public class TestHCatLoader extends TestCase {
+    private static final String TEST_DATA_DIR = System.getProperty("user.dir") +
+        "/build/test/data/" + TestHCatLoader.class.getCanonicalName();
+    private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+    private static final String BASIC_FILE_NAME = TEST_DATA_DIR + "/basic.input.data";
+    private static final String COMPLEX_FILE_NAME = TEST_DATA_DIR + "/complex.input.data";
+
+    private static final String BASIC_TABLE = "junit_unparted_basic";
+    private static final String COMPLEX_TABLE = "junit_unparted_complex";
+    private static final String PARTITIONED_TABLE = "junit_parted_basic";
+    private static final String SPECIFIC_SIZE_TABLE = "junit_specific_size";
+    private static Driver driver;
+
+    private static int guardTestCount = 6; // ugh, instantiate using introspection in guardedSetupBeforeClass
+    private static boolean setupHasRun = false;
+
+    
+    private static Map<Integer, Pair<Integer, String>> basicInputData;
+
+    protected String storageFormat() {
+        return "RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
+            "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver')";
+    }
+
+    private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
+        driver.run("drop table " + tablename);
+    }
+
+    private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
+        String createTable;
+        createTable = "create table " + tablename + "(" + schema + ") ";
+        if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
+            createTable = createTable + "partitioned by (" + partitionedBy + ") ";
+        }
+        createTable = createTable + "stored as " +storageFormat();
+        int retCode = driver.run(createTable).getResponseCode();
+        if (retCode != 0) {
+            throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + "]");
+        }
+    }
+
+    private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
+        createTable(tablename, schema, null);
+    }
+
+    protected void guardedSetUpBeforeClass() throws Exception {
+        if (!setupHasRun) {
+            setupHasRun = true;
+        } else {
+            return;
+        }
+
+        File f = new File(TEST_WAREHOUSE_DIR);
+        if (f.exists()) {
+            FileUtil.fullyDelete(f);
+        }
+        new File(TEST_WAREHOUSE_DIR).mkdirs();
+
+        HiveConf hiveConf = new HiveConf(this.getClass());
+        hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+        hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
+        driver = new Driver(hiveConf);
+        SessionState.start(new CliSessionState(hiveConf));
+
+        cleanup();
+
+        createTable(BASIC_TABLE, "a int, b string");
+        createTable(COMPLEX_TABLE,
+            "name string, studentid int, "
+                + "contact struct<phno:string,email:string>, "
+                + "currently_registered_courses array<string>, "
+                + "current_grades map<string,string>, "
+                + "phnos array<struct<phno:string,type:string>>");
+
+        createTable(PARTITIONED_TABLE, "a int, b string", "bkt string");
+        createTable(SPECIFIC_SIZE_TABLE, "a int, b string");
+
+        int LOOP_SIZE = 3;
+        String[] input = new String[LOOP_SIZE * LOOP_SIZE];
+        basicInputData = new HashMap<Integer, Pair<Integer, String>>();
+        int k = 0;
+        for (int i = 1; i <= LOOP_SIZE; i++) {
+            String si = i + "";
+            for (int j = 1; j <= LOOP_SIZE; j++) {
+                String sj = "S" + j + "S";
+                input[k] = si + "\t" + sj;
+                basicInputData.put(k, new Pair<Integer, String>(i, sj));
+                k++;
+            }
+        }
+        HcatTestUtils.createTestDataFile(BASIC_FILE_NAME, input);
+        HcatTestUtils.createTestDataFile(COMPLEX_FILE_NAME,
+            new String[]{
+                //"Henry Jekyll\t42\t(415-253-6367,hjekyll@contemporary.edu.uk)\t{(PHARMACOLOGY),(PSYCHIATRY)},[PHARMACOLOGY#A-,PSYCHIATRY#B+],{(415-253-6367,cell),(408-253-6367,landline)}",
+                //"Edward Hyde\t1337\t(415-253-6367,anonymous@b44chan.org)\t{(CREATIVE_WRITING),(COPYRIGHT_LAW)},[CREATIVE_WRITING#A+,COPYRIGHT_LAW#D],{(415-253-6367,cell),(408-253-6367,landline)}",
+            }
+        );
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+        server.setBatchOn();
+        server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);");
+
+        server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");
+        server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");
+        server.registerQuery("B = foreach A generate a,b;");
+        server.registerQuery("B2 = filter B by a < 2;");
+        server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
+
+        server.registerQuery("C = foreach A generate a,b;");
+        server.registerQuery("C2 = filter C by a >= 2;");
+        server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatStorer('bkt=1');");
+
+        server.registerQuery("D = load '" + COMPLEX_FILE_NAME + "' as (name:chararray, studentid:int, contact:tuple(phno:chararray,email:chararray), currently_registered_courses:bag{innertup:tuple(course:chararray)}, current_grades:map[ ] , phnos :bag{innertup:tuple(phno:chararray,type:chararray)});");
+        server.registerQuery("store D into '" + COMPLEX_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");
+        server.executeBatch();
+
+    }
+
+    private void cleanup() throws IOException, CommandNeedRetryException {
+        dropTable(BASIC_TABLE);
+        dropTable(COMPLEX_TABLE);
+        dropTable(PARTITIONED_TABLE);
+        dropTable(SPECIFIC_SIZE_TABLE);
+    }
+
+    protected void guardedTearDownAfterClass() throws Exception {
+        guardTestCount--;
+        if (guardTestCount > 0) {
+            return;
+        }
+        cleanup();
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        guardedSetUpBeforeClass();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        guardedTearDownAfterClass();
+    }
+
+    public void testSchemaLoadBasic() throws IOException {
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+
+        // test that schema was loaded correctly
+        server.registerQuery("X = load '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        Schema dumpedXSchema = server.dumpSchema("X");
+        List<FieldSchema> Xfields = dumpedXSchema.getFields();
+        assertEquals(2, Xfields.size());
+        assertTrue(Xfields.get(0).alias.equalsIgnoreCase("a"));
+        assertTrue(Xfields.get(0).type == DataType.INTEGER);
+        assertTrue(Xfields.get(1).alias.equalsIgnoreCase("b"));
+        assertTrue(Xfields.get(1).type == DataType.CHARARRAY);
+
+    }
+
+    public void testReadDataBasic() throws IOException {
+        PigServer server = new PigServer(ExecType.LOCAL);
+
+        server.registerQuery("X = load '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        Iterator<Tuple> XIter = server.openIterator("X");
+        int numTuplesRead = 0;
+        while (XIter.hasNext()) {
+            Tuple t = XIter.next();
+            assertEquals(2, t.size());
+            assertTrue(t.get(0).getClass() == Integer.class);
+            assertTrue(t.get(1).getClass() == String.class);
+            assertEquals(t.get(0), basicInputData.get(numTuplesRead).first);
+            assertEquals(t.get(1), basicInputData.get(numTuplesRead).second);
+            numTuplesRead++;
+        }
+        assertEquals(basicInputData.size(), numTuplesRead);
+    }
+
+    public void testSchemaLoadComplex() throws IOException {
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+
+        // test that schema was loaded correctly
+        server.registerQuery("K = load '" + COMPLEX_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        Schema dumpedKSchema = server.dumpSchema("K");
+        List<FieldSchema> Kfields = dumpedKSchema.getFields();
+        assertEquals(6, Kfields.size());
+
+        assertEquals(DataType.CHARARRAY, Kfields.get(0).type);
+        assertEquals("name", Kfields.get(0).alias.toLowerCase());
+
+        assertEquals(DataType.INTEGER, Kfields.get(1).type);
+        assertEquals("studentid", Kfields.get(1).alias.toLowerCase());
+
+        assertEquals(DataType.TUPLE, Kfields.get(2).type);
+        assertEquals("contact", Kfields.get(2).alias.toLowerCase());
+        {
+            assertNotNull(Kfields.get(2).schema);
+            assertTrue(Kfields.get(2).schema.getFields().size() == 2);
+            assertTrue(Kfields.get(2).schema.getFields().get(0).type == DataType.CHARARRAY);
+            assertTrue(Kfields.get(2).schema.getFields().get(0).alias.equalsIgnoreCase("phno"));
+            assertTrue(Kfields.get(2).schema.getFields().get(1).type == DataType.CHARARRAY);
+            assertTrue(Kfields.get(2).schema.getFields().get(1).alias.equalsIgnoreCase("email"));
+        }
+        assertEquals(DataType.BAG, Kfields.get(3).type);
+        assertEquals("currently_registered_courses", Kfields.get(3).alias.toLowerCase());
+        {
+            assertNotNull(Kfields.get(3).schema);
+            assertEquals(1, Kfields.get(3).schema.getFields().size());
+            assertEquals(DataType.TUPLE, Kfields.get(3).schema.getFields().get(0).type);
+            assertNotNull(Kfields.get(3).schema.getFields().get(0).schema);
+            assertEquals(1, Kfields.get(3).schema.getFields().get(0).schema.getFields().size());
+            assertEquals(DataType.CHARARRAY, Kfields.get(3).schema.getFields().get(0).schema.getFields().get(0).type);
+            // assertEquals("course",Kfields.get(3).schema.getFields().get(0).schema.getFields().get(0).alias.toLowerCase());
+            // commented out, because the name becomes "innerfield" by default - we call it "course" in pig,
+            // but in the metadata, it'd be anonymous, so this would be autogenerated, which is fine
+        }
+        assertEquals(DataType.MAP, Kfields.get(4).type);
+        assertEquals("current_grades", Kfields.get(4).alias.toLowerCase());
+        assertEquals(DataType.BAG, Kfields.get(5).type);
+        assertEquals("phnos", Kfields.get(5).alias.toLowerCase());
+        {
+            assertNotNull(Kfields.get(5).schema);
+            assertEquals(1, Kfields.get(5).schema.getFields().size());
+            assertEquals(DataType.TUPLE, Kfields.get(5).schema.getFields().get(0).type);
+            assertNotNull(Kfields.get(5).schema.getFields().get(0).schema);
+            assertTrue(Kfields.get(5).schema.getFields().get(0).schema.getFields().size() == 2);
+            assertEquals(DataType.CHARARRAY, Kfields.get(5).schema.getFields().get(0).schema.getFields().get(0).type);
+            assertEquals("phno", Kfields.get(5).schema.getFields().get(0).schema.getFields().get(0).alias.toLowerCase());
+            assertEquals(DataType.CHARARRAY, Kfields.get(5).schema.getFields().get(0).schema.getFields().get(1).type);
+            assertEquals("type", Kfields.get(5).schema.getFields().get(0).schema.getFields().get(1).alias.toLowerCase());
+        }
+
+    }
+
+    public void testReadPartitionedBasic() throws IOException, CommandNeedRetryException {
+        PigServer server = new PigServer(ExecType.LOCAL);
+
+        driver.run("select * from " + PARTITIONED_TABLE);
+        ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+        driver.getResults(valuesReadFromHiveDriver);
+        assertEquals(basicInputData.size(), valuesReadFromHiveDriver.size());
+
+        server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        Schema dumpedWSchema = server.dumpSchema("W");
+        List<FieldSchema> Wfields = dumpedWSchema.getFields();
+        assertEquals(3, Wfields.size());
+        assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
+        assertTrue(Wfields.get(0).type == DataType.INTEGER);
+        assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
+        assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
+        assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
+        assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
+
+        Iterator<Tuple> WIter = server.openIterator("W");
+        Collection<Pair<Integer, String>> valuesRead = new ArrayList<Pair<Integer, String>>();
+        while (WIter.hasNext()) {
+            Tuple t = WIter.next();
+            assertTrue(t.size() == 3);
+            assertTrue(t.get(0).getClass() == Integer.class);
+            assertTrue(t.get(1).getClass() == String.class);
+            assertTrue(t.get(2).getClass() == String.class);
+            valuesRead.add(new Pair<Integer, String>((Integer) t.get(0), (String) t.get(1)));
+            if ((Integer) t.get(0) < 2) {
+                assertEquals("0", t.get(2));
+            } else {
+                assertEquals("1", t.get(2));
+            }
+        }
+        assertEquals(valuesReadFromHiveDriver.size(), valuesRead.size());
+
+        server.registerQuery("P1 = load '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        server.registerQuery("P1filter = filter P1 by bkt == '0';");
+        Iterator<Tuple> P1Iter = server.openIterator("P1filter");
+        int count1 = 0;
+        while (P1Iter.hasNext()) {
+            Tuple t = P1Iter.next();
+
+            assertEquals("0", t.get(2));
+            assertEquals(1, t.get(0));
+            count1++;
+        }
+        assertEquals(3, count1);
+
+        server.registerQuery("P2 = load '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        server.registerQuery("P2filter = filter P2 by bkt == '1';");
+        Iterator<Tuple> P2Iter = server.openIterator("P2filter");
+        int count2 = 0;
+        while (P2Iter.hasNext()) {
+            Tuple t = P2Iter.next();
+
+            assertEquals("1", t.get(2));
+            assertTrue(((Integer) t.get(0)) > 1);
+            count2++;
+        }
+        assertEquals(6, count2);
+    }
+
+    public void testProjectionsBasic() throws IOException {
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+
+        // projections are handled by using generate, not "as" on the Load
+
+        server.registerQuery("Y1 = load '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        server.registerQuery("Y2 = foreach Y1 generate a;");
+        server.registerQuery("Y3 = foreach Y1 generate b,a;");
+        Schema dumpedY2Schema = server.dumpSchema("Y2");
+        Schema dumpedY3Schema = server.dumpSchema("Y3");
+        List<FieldSchema> Y2fields = dumpedY2Schema.getFields();
+        List<FieldSchema> Y3fields = dumpedY3Schema.getFields();
+        assertEquals(1, Y2fields.size());
+        assertEquals("a", Y2fields.get(0).alias.toLowerCase());
+        assertEquals(DataType.INTEGER, Y2fields.get(0).type);
+        assertEquals(2, Y3fields.size());
+        assertEquals("b", Y3fields.get(0).alias.toLowerCase());
+        assertEquals(DataType.CHARARRAY, Y3fields.get(0).type);
+        assertEquals("a", Y3fields.get(1).alias.toLowerCase());
+        assertEquals(DataType.INTEGER, Y3fields.get(1).type);
+
+        int numTuplesRead = 0;
+        Iterator<Tuple> Y2Iter = server.openIterator("Y2");
+        while (Y2Iter.hasNext()) {
+            Tuple t = Y2Iter.next();
+            assertEquals(t.size(), 1);
+            assertTrue(t.get(0).getClass() == Integer.class);
+            assertEquals(t.get(0), basicInputData.get(numTuplesRead).first);
+            numTuplesRead++;
+        }
+        numTuplesRead = 0;
+        Iterator<Tuple> Y3Iter = server.openIterator("Y3");
+        while (Y3Iter.hasNext()) {
+            Tuple t = Y3Iter.next();
+            assertEquals(t.size(), 2);
+            assertTrue(t.get(0).getClass() == String.class);
+            assertEquals(t.get(0), basicInputData.get(numTuplesRead).second);
+            assertTrue(t.get(1).getClass() == Integer.class);
+            assertEquals(t.get(1), basicInputData.get(numTuplesRead).first);
+            numTuplesRead++;
+        }
+        assertEquals(basicInputData.size(), numTuplesRead);
+    }
+
+    public void testGetInputBytes() throws Exception {
+        File file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE + "/part-m-00000");
+        file.deleteOnExit();
+        RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+        randomAccessFile.setLength(2L * 1024 * 1024 * 1024);
+
+        Job job = new Job();
+        HCatLoader hCatLoader = new HCatLoader();
+        hCatLoader.setUDFContextSignature(this.getName());
+        hCatLoader.setLocation(SPECIFIC_SIZE_TABLE, job);
+        ResourceStatistics statistics = hCatLoader.getStatistics(file.getAbsolutePath(), job);
+        assertEquals(2048, (long) statistics.getmBytes());
+    }
+
+    public void testConvertBooleanToInt() throws Exception {
+        String tbl = "test_convert_boolean_to_int";
+        String inputFileName = TEST_DATA_DIR + "/testConvertBooleanToInt/data.txt";
+        File inputDataDir = new File(inputFileName).getParentFile();
+        inputDataDir.mkdir();
+
+        String[] lines = new String[]{"llama\t1", "alpaca\t0"};
+        HcatTestUtils.createTestDataFile(inputFileName, lines);
+
+        assertEquals(0, driver.run("drop table if exists " + tbl).getResponseCode());
+        assertEquals(0, driver.run("create external table " + tbl +
+            " (a string, b boolean) row format delimited fields terminated by '\t'" +
+            " stored as textfile location 'file://" +
+            inputDataDir.getAbsolutePath() + "'").getResponseCode());
+
+        Properties properties = new Properties();
+        properties.setProperty(HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, "true");
+        PigServer server = new PigServer(ExecType.LOCAL, properties);
+        server.registerQuery(
+            "data = load 'test_convert_boolean_to_int' using org.apache.hcatalog.pig.HCatLoader();");
+        Schema schema = server.dumpSchema("data");
+        assertEquals(2, schema.getFields().size());
+
+        assertEquals("a", schema.getField(0).alias);
+        assertEquals(DataType.CHARARRAY, schema.getField(0).type);
+        assertEquals("b", schema.getField(1).alias);
+        assertEquals(DataType.INTEGER, schema.getField(1).type);
+
+        Iterator<Tuple> iterator = server.openIterator("data");
+        Tuple t = iterator.next();
+        assertEquals("llama", t.get(0));
+        // TODO: Figure out how to load a text file into Hive with boolean columns. This next assert
+        // passes because data was loaded as integers, not because it was converted.
+        assertEquals(1, t.get(1));
+        t = iterator.next();
+        assertEquals("alpaca", t.get(0));
+        assertEquals(0, t.get(1));
+        assertFalse(iterator.hasNext());
+    }
+}

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHCatLoaderComplexSchema {
+
+    //private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static Driver driver;
+    //private static Properties props;
+    private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoaderComplexSchema.class);
+
+    private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
+        driver.run("drop table " + tablename);
+    }
+
+    private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
+        String createTable;
+        createTable = "create table " + tablename + "(" + schema + ") ";
+        if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
+            createTable = createTable + "partitioned by (" + partitionedBy + ") ";
+        }
+        createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
+            "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+        LOG.info("Creating table:\n {}", createTable);
+        CommandProcessorResponse result = driver.run(createTable);
+        int retCode = result.getResponseCode();
+        if (retCode != 0) {
+            throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + " " + result.getErrorMessage() + "]");
+        }
+    }
+
+    private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
+        createTable(tablename, schema, null);
+    }
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+
+        HiveConf hiveConf = new HiveConf(TestHCatLoaderComplexSchema.class);
+        hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+        driver = new Driver(hiveConf);
+        SessionState.start(new CliSessionState(hiveConf));
+        //props = new Properties();
+        //props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+
+    }
+
+    private static final TupleFactory tf = TupleFactory.getInstance();
+    private static final BagFactory bf = BagFactory.getInstance();
+
+    private Tuple t(Object... objects) {
+        return tf.newTuple(Arrays.asList(objects));
+    }
+
+    private DataBag b(Tuple... objects) {
+        return bf.newDefaultBag(Arrays.asList(objects));
+    }
+
+    /**
+     * artificially complex nested schema to test nested schema conversion
+     * @throws Exception
+     */
+    @Test
+    public void testSyntheticComplexSchema() throws Exception {
+        String pigSchema =
+                "a: " +
+                "(" +
+                "aa: chararray, " +
+                "ab: long, " +
+                "ac: map[], " +
+                "ad: { t: (ada: long) }, " +
+                "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," +
+                "af: (afa: chararray, afb: long) " +
+                ")," +
+                "b: chararray, " +
+                "c: long, " +
+                "d:  { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } ";
+
+        // with extra structs
+        String tableSchema =
+            "a struct<" +
+                "aa: string, " +
+                "ab: bigint, " +
+                "ac: map<string, string>, " +
+                "ad: array<struct<ada:bigint>>, " +
+                "ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," +
+                "af: struct<afa: string, afb: bigint> " +
+                ">, " +
+                "b string, " +
+                "c bigint, " +
+                "d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<struct<dca: bigint>>>>";
+
+        // without extra structs
+        String tableSchema2 =
+            "a struct<" +
+                "aa: string, " +
+                "ab: bigint, " +
+                "ac: map<string, string>, " +
+                "ad: array<bigint>, " +
+                "ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," +
+                "af: struct<afa: string, afb: bigint> " +
+                ">, " +
+                "b string, " +
+                "c bigint, " +
+                "d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<bigint>>>";
+
+        List<Tuple> data = new ArrayList<Tuple>();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = t(
+                t(
+                    "aa test",
+                    2l,
+                    new HashMap<String, String>() {
+                        {
+                            put("ac test1", "test 1");
+                            put("ac test2", "test 2");
+                        }
+                    },
+                    b(t(3l), t(4l)),
+                    b(t(5l, t("aeba test", 6l))),
+                    t("afa test", 7l)
+                ),
+                "b test",
+                (long) i,
+                b(t(8l, t("dba test", 9l), b(t(10l)))));
+
+            data.add(t);
+        }
+        verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, true);
+        verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, false);
+        verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, true);
+        verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, false);
+
+    }
+
+    private void verifyWriteRead(String tablename, String pigSchema, String tableSchema, List<Tuple> data, boolean provideSchemaToStorer)
+        throws IOException, CommandNeedRetryException, ExecException, FrontendException {
+        MockLoader.setData(tablename + "Input", data);
+        try {
+            createTable(tablename, tableSchema);
+            PigServer server = new PigServer(ExecType.LOCAL);
+            server.setBatchOn();
+            server.registerQuery("A = load '" + tablename + "Input' using org.apache.hcatalog.pig.MockLoader() AS (" + pigSchema + ");");
+            Schema dumpedASchema = server.dumpSchema("A");
+            server.registerQuery("STORE A into '" + tablename + "' using org.apache.hcatalog.pig.HCatStorer("
+                + (provideSchemaToStorer ? "'', '" + pigSchema + "'" : "")
+                + ");");
+
+            ExecJob execJob = server.executeBatch().get(0);
+            if (!execJob.getStatistics().isSuccessful()) {
+                throw new RuntimeException("Import failed", execJob.getException());
+            }
+            // test that schema was loaded correctly
+            server.registerQuery("X = load '" + tablename + "' using org.apache.hcatalog.pig.HCatLoader();");
+            server.dumpSchema("X");
+            Iterator<Tuple> it = server.openIterator("X");
+            int i = 0;
+            while (it.hasNext()) {
+                Tuple input = data.get(i++);
+                Tuple output = it.next();
+                Assert.assertEquals(input.toString(), output.toString());
+                LOG.info("tuple : {} ", output);
+            }
+            Schema dumpedXSchema = server.dumpSchema("X");
+
+            Assert.assertEquals(
+                "expected " + dumpedASchema + " but was " + dumpedXSchema + " (ignoring field names)",
+                "",
+                compareIgnoreFiledNames(dumpedASchema, dumpedXSchema));
+
+        } finally {
+            dropTable(tablename);
+        }
+    }
+
+    private String compareIgnoreFiledNames(Schema expected, Schema got) throws FrontendException {
+        if (expected == null || got == null) {
+            if (expected == got) {
+                return "";
+            } else {
+                return "\nexpected " + expected + " got " + got;
+            }
+        }
+        if (expected.size() != got.size()) {
+            return "\nsize expected " + expected.size() + " (" + expected + ") got " + got.size() + " (" + got + ")";
+        }
+        String message = "";
+        for (int i = 0; i < expected.size(); i++) {
+            FieldSchema expectedField = expected.getField(i);
+            FieldSchema gotField = got.getField(i);
+            if (expectedField.type != gotField.type) {
+                message += "\ntype expected " + expectedField.type + " (" + expectedField + ") got " + gotField.type + " (" + gotField + ")";
+            } else {
+                message += compareIgnoreFiledNames(expectedField.schema, gotField.schema);
+            }
+        }
+        return message;
+    }
+
+    /**
+     * tests that unnecessary tuples are drop while converting schema
+     * (Pig requires Tuples in Bags)
+     * @throws Exception
+     */
+    @Test
+    public void testTupleInBagInTupleInBag() throws Exception {
+        String pigSchema = "a: { b : ( c: { d: (i : long) } ) }";
+
+        String tableSchema = "a array< array< bigint > >";
+
+        List<Tuple> data = new ArrayList<Tuple>();
+        data.add(t(b(t(b(t(100l), t(101l))), t(b(t(110l))))));
+        data.add(t(b(t(b(t(200l))), t(b(t(210l))), t(b(t(220l))))));
+        data.add(t(b(t(b(t(300l), t(301l))))));
+        data.add(t(b(t(b(t(400l))), t(b(t(410l), t(411l), t(412l))))));
+
+
+        verifyWriteRead("TupleInBagInTupleInBag1", pigSchema, tableSchema, data, true);
+        verifyWriteRead("TupleInBagInTupleInBag2", pigSchema, tableSchema, data, false);
+
+        // test that we don't drop the unnecessary tuple if the table has the corresponding Struct
+        String tableSchema2 = "a array< struct< c: array< struct< i: bigint > > > >";
+
+        verifyWriteRead("TupleInBagInTupleInBag3", pigSchema, tableSchema2, data, true);
+        verifyWriteRead("TupleInBagInTupleInBag4", pigSchema, tableSchema2, data, false);
+
+    }
+
+    @Test
+    public void testMapWithComplexData() throws Exception {
+        String pigSchema = "a: long, b: map[]";
+        String tableSchema = "a bigint, b map<string, struct<aa:bigint, ab:string>>";
+
+        List<Tuple> data = new ArrayList<Tuple>();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = t(
+                (long) i,
+                new HashMap<String, Object>() {
+                    {
+                        put("b test 1", t(1l, "test 1"));
+                        put("b test 2", t(2l, "test 2"));
+                    }
+                });
+
+            data.add(t);
+        }
+        verifyWriteRead("testMapWithComplexData", pigSchema, tableSchema, data, true);
+        verifyWriteRead("testMapWithComplexData2", pigSchema, tableSchema, data, false);
+
+    }
+}

Added: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java (added)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.mapreduce.HCatBaseTest;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Test that require both HCatLoader and HCatStorer. For read or write only functionality,
+ * please consider @{link TestHCatLoader} or @{link TestHCatStorer}.
+ */
+public class TestHCatLoaderStorer extends HCatBaseTest {
+
+    /**
+     * Ensure Pig can read/write tinyint/smallint columns.
+     */
+    @Test
+    public void testSmallTinyInt() throws Exception {
+
+        String readTblName = "test_small_tiny_int";
+        File dataDir = new File(TEST_DATA_DIR + "/testSmallTinyIntData");
+        File dataFile = new File(dataDir, "testSmallTinyInt.tsv");
+
+        String writeTblName = "test_small_tiny_int_write";
+        File writeDataFile = new File(TEST_DATA_DIR, writeTblName + ".tsv");
+
+        FileUtil.fullyDelete(dataDir); // Might not exist
+        Assert.assertTrue(dataDir.mkdir());
+
+        HcatTestUtils.createTestDataFile(dataFile.getAbsolutePath(), new String[]{
+            String.format("%d\t%d", Short.MIN_VALUE, Byte.MIN_VALUE),
+            String.format("%d\t%d", Short.MAX_VALUE, Byte.MAX_VALUE)
+        });
+
+        // Create a table with smallint/tinyint columns, load data, and query from Hive.
+        Assert.assertEquals(0, driver.run("drop table if exists " + readTblName).getResponseCode());
+        Assert.assertEquals(0, driver.run("create external table " + readTblName +
+            " (my_small_int smallint, my_tiny_int tinyint)" +
+            " row format delimited fields terminated by '\t' stored as textfile").getResponseCode());
+        Assert.assertEquals(0, driver.run("load data local inpath '" +
+            dataDir.getAbsolutePath() + "' into table " + readTblName).getResponseCode());
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+        server.registerQuery(
+            "data = load '" + readTblName + "' using org.apache.hcatalog.pig.HCatLoader();");
+
+        // Ensure Pig schema is correct.
+        Schema schema = server.dumpSchema("data");
+        Assert.assertEquals(2, schema.getFields().size());
+        Assert.assertEquals("my_small_int", schema.getField(0).alias);
+        Assert.assertEquals(DataType.INTEGER, schema.getField(0).type);
+        Assert.assertEquals("my_tiny_int", schema.getField(1).alias);
+        Assert.assertEquals(DataType.INTEGER, schema.getField(1).type);
+
+        // Ensure Pig can read data correctly.
+        Iterator<Tuple> it = server.openIterator("data");
+        Tuple t = it.next();
+        Assert.assertEquals(new Integer(Short.MIN_VALUE), t.get(0));
+        Assert.assertEquals(new Integer(Byte.MIN_VALUE), t.get(1));
+        t = it.next();
+        Assert.assertEquals(new Integer(Short.MAX_VALUE), t.get(0));
+        Assert.assertEquals(new Integer(Byte.MAX_VALUE), t.get(1));
+        Assert.assertFalse(it.hasNext());
+
+        // Ensure Pig can write correctly to smallint/tinyint columns. This means values within the
+        // bounds of the column type are written, and values outside throw an exception.
+        Assert.assertEquals(0, driver.run("drop table if exists " + writeTblName).getResponseCode());
+        Assert.assertEquals(0, driver.run("create table " + writeTblName +
+            " (my_small_int smallint, my_tiny_int tinyint) stored as rcfile").getResponseCode());
+
+        // Values within the column type bounds.
+        HcatTestUtils.createTestDataFile(writeDataFile.getAbsolutePath(), new String[]{
+            String.format("%d\t%d", Short.MIN_VALUE, Byte.MIN_VALUE),
+            String.format("%d\t%d", Short.MAX_VALUE, Byte.MAX_VALUE)
+        });
+        smallTinyIntBoundsCheckHelper(writeDataFile.getAbsolutePath(), ExecJob.JOB_STATUS.COMPLETED);
+
+        // Values outside the column type bounds will fail at runtime.
+        HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/shortTooSmall.tsv", new String[]{
+            String.format("%d\t%d", Short.MIN_VALUE - 1, 0)});
+        smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/shortTooSmall.tsv", ExecJob.JOB_STATUS.FAILED);
+
+        HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/shortTooBig.tsv", new String[]{
+            String.format("%d\t%d", Short.MAX_VALUE + 1, 0)});
+        smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/shortTooBig.tsv", ExecJob.JOB_STATUS.FAILED);
+
+        HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/byteTooSmall.tsv", new String[]{
+            String.format("%d\t%d", 0, Byte.MIN_VALUE - 1)});
+        smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/byteTooSmall.tsv", ExecJob.JOB_STATUS.FAILED);
+
+        HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/byteTooBig.tsv", new String[]{
+            String.format("%d\t%d", 0, Byte.MAX_VALUE + 1)});
+        smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/byteTooBig.tsv", ExecJob.JOB_STATUS.FAILED);
+    }
+
+    private void smallTinyIntBoundsCheckHelper(String data, ExecJob.JOB_STATUS expectedStatus)
+        throws Exception {
+        Assert.assertEquals(0, driver.run("drop table if exists test_tbl").getResponseCode());
+        Assert.assertEquals(0, driver.run("create table test_tbl" +
+            " (my_small_int smallint, my_tiny_int tinyint) stored as rcfile").getResponseCode());
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+        server.setBatchOn();
+        server.registerQuery("data = load '" + data +
+            "' using PigStorage('\t') as (my_small_int:int, my_tiny_int:int);");
+        server.registerQuery(
+            "store data into 'test_tbl' using org.apache.hcatalog.pig.HCatStorer();");
+        List<ExecJob> jobs = server.executeBatch();
+        Assert.assertEquals(expectedStatus, jobs.get(0).getStatus());
+    }
+}