You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ga...@apache.org on 2011/04/12 17:30:12 UTC

svn commit: r1091509 [8/8] - in /incubator/hcatalog/trunk: ./ bin/ ivy/ src/ src/docs/ src/docs/src/ src/docs/src/documentation/ src/docs/src/documentation/classes/ src/docs/src/documentation/conf/ src/docs/src/documentation/content/ src/docs/src/docum...

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,587 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.rcfile.RCFileInputDriver;
+import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.impl.util.UDFContext;
+
+public class TestHCatStorer extends TestCase {
+
+  MiniCluster cluster = MiniCluster.buildCluster();
+  private Driver driver;
+  Properties props;
+
+  @Override
+  protected void setUp() throws Exception {
+
+    HiveConf hiveConf = new HiveConf(this.getClass());
+    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+    driver = new Driver(hiveConf);
+    SessionState.start(new CliSessionState(hiveConf));
+    props = new Properties();
+    props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+    fullFileName = cluster.getProperties().getProperty("fs.default.name") + fileName;
+  }
+
+  String fileName = "/tmp/input.data";
+  String fullFileName;
+
+
+//  public void testStoreFuncMap() throws IOException{
+//
+//    driver.run("drop table junit_unparted");
+//    String createTable = "create table junit_unparted(b string,arr_of_maps array<map<string,string>>) stored as RCFILE " +
+//        "tblproperties('hcat.isd'='org.apache.hadoop.hive.hCatalog.rcfile.RCFileInputStorageDriver'," +
+//        "'hcat.osd'='org.apache.hadoop.hive.hCatalog.rcfile.RCFileOutputStorageDriver') ";
+//    int retCode = driver.run(createTable).getResponseCode();
+//    if(retCode != 0) {
+//      throw new IOException("Failed to create table.");
+//    }
+//
+//    MiniCluster.deleteFile(cluster, fileName);
+//    MiniCluster.createInputFile(cluster, fileName, new String[]{"test\t{([a#haddop,b#pig])}","data\t{([b#hive,a#howl])}"});
+//
+//    PigServer server = new PigServer(ExecType.LOCAL, props);
+//    UDFContext.getUDFContext().setClientSystemProps();
+//    server.setBatchOn();
+//    server.registerQuery("A = load '"+ fullFileName +"' as (b:chararray,arr_of_maps:bag{mytup:tuple ( mymap:map[ ])});");
+//    server.registerQuery("store A into 'default.junit_unparted' using org.apache.hadoop.hive.hCatalog.pig.HCatStorer('','b:chararray,arr_of_maps:bag{mytup:tuple ( mymap:map[ ])}');");
+//    server.executeBatch();
+//
+//
+//
+//    MiniCluster.deleteFile(cluster, fileName);
+//
+//    driver.run("select * from junit_unparted");
+//    ArrayList<String> res = new ArrayList<String>();
+//    driver.getResults(res);
+//    driver.run("drop table junit_unparted");
+//    Iterator<String> itr = res.iterator();
+//    System.out.println(itr.next());
+//    System.out.println(itr.next());
+//   assertFalse(itr.hasNext());
+//
+//  }
+
+  public void testPartColsInData() throws IOException{
+
+    driver.run("drop table junit_unparted");
+    String createTable = "create table junit_unparted(a int) partitioned by (b string) stored as RCFILE " +
+        "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+        "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+    MiniCluster.deleteFile(cluster, fileName);
+    int LOOP_SIZE = 11;
+    String[] input = new String[LOOP_SIZE];
+    for(int i = 0; i < LOOP_SIZE; i++) {
+        input[i] = i + "\t1";
+    }
+    MiniCluster.createInputFile(cluster, fileName, input);
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.registerQuery("A = load '"+fullFileName+"' as (a:int, b:chararray);");
+    server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('b=1');");
+    server.registerQuery("B = load 'default.junit_unparted' using "+HCatLoader.class.getName()+"();");
+    Iterator<Tuple> itr= server.openIterator("B");
+
+    int i = 0;
+
+    while(itr.hasNext()){
+      Tuple t = itr.next();
+      assertEquals(2, t.size());
+      assertEquals(t.get(0), i);
+      assertEquals(t.get(1), "1");
+      i++;
+    }
+
+    assertFalse(itr.hasNext());
+    assertEquals(11, i);
+    MiniCluster.deleteFile(cluster, fileName);
+  }
+
+  public void testMultiPartColsInData() throws IOException{
+
+    driver.run("drop table employee");
+    String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
+    		" PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " +
+    		"tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+        "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+
+    MiniCluster.deleteFile(cluster, fullFileName);
+    String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN",
+                          "111238\tKalpana\t01/01/2000\tF\tIN\tKA",
+                          "111239\tSatya\t01/01/2001\tM\tIN\tKL",
+                          "111240\tKavya\t01/01/2002\tF\tIN\tAP"};
+
+    MiniCluster.createInputFile(cluster, fullFileName, inputData);
+    PigServer pig = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    pig.setBatchOn();
+    pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
+    		"emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
+    pig.registerQuery("TN = FILTER A BY emp_state == 'TN';");
+    pig.registerQuery("KA = FILTER A BY emp_state == 'KA';");
+    pig.registerQuery("KL = FILTER A BY emp_state == 'KL';");
+    pig.registerQuery("AP = FILTER A BY emp_state == 'AP';");
+    pig.registerQuery("STORE TN INTO 'employee' USING "+HCatStorer.class.getName()+"('emp_country=IN,emp_state=TN');");
+    pig.registerQuery("STORE KA INTO 'employee' USING "+HCatStorer.class.getName()+"('emp_country=IN,emp_state=KA');");
+    pig.registerQuery("STORE KL INTO 'employee' USING "+HCatStorer.class.getName()+"('emp_country=IN,emp_state=KL');");
+    pig.registerQuery("STORE AP INTO 'employee' USING "+HCatStorer.class.getName()+"('emp_country=IN,emp_state=AP');");
+    pig.executeBatch();
+    driver.run("select * from employee");
+    ArrayList<String> results = new ArrayList<String>();
+    driver.getResults(results);
+    assertEquals(4, results.size());
+    Collections.sort(results);
+    assertEquals(inputData[0], results.get(0));
+    assertEquals(inputData[1], results.get(1));
+    assertEquals(inputData[2], results.get(2));
+    assertEquals(inputData[3], results.get(3));
+    MiniCluster.deleteFile(cluster, fullFileName);
+    driver.run("drop table employee");
+  }
+
+  public void testStoreInPartiitonedTbl() throws IOException{
+
+    driver.run("drop table junit_unparted");
+    String createTable = "create table junit_unparted(a int) partitioned by (b string) stored as RCFILE " +
+        "tblproperties('hcat.isd'='"+RCFileInputDriver.class.getName()+"'," +
+        "'"+HCatConstants.HCAT_OSD_CLASS+"'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+    MiniCluster.deleteFile(cluster, fileName);
+    int LOOP_SIZE = 11;
+    String[] input = new String[LOOP_SIZE];
+    for(int i = 0; i < LOOP_SIZE; i++) {
+        input[i] = i+"";
+    }
+    MiniCluster.createInputFile(cluster, fileName, input);
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.registerQuery("A = load '"+fullFileName+"' as (a:int);");
+    server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('b=1');");
+    server.registerQuery("B = load 'default.junit_unparted' using "+HCatLoader.class.getName()+"();");
+    Iterator<Tuple> itr= server.openIterator("B");
+
+    int i = 0;
+
+    while(itr.hasNext()){
+      Tuple t = itr.next();
+      assertEquals(2, t.size());
+      assertEquals(t.get(0), i);
+      assertEquals(t.get(1), "1");
+      i++;
+    }
+
+    assertFalse(itr.hasNext());
+    assertEquals(11, i);
+    MiniCluster.deleteFile(cluster, fileName);
+  }
+
+  public void testNoAlias() throws IOException{
+    driver.run("drop table junit_parted");
+    String createTable = "create table junit_parted(a int, b string) partitioned by (ds string) stored as RCFILE " +
+        "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+        "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    boolean errCaught = false;
+    try{
+      server.setBatchOn();
+      server.registerQuery("A = load '"+ fullFileName +"' as (a:int, b:chararray);");
+      server.registerQuery("B = foreach A generate a+10, b;");
+      server.registerQuery("store B into 'junit_parted' using "+HCatStorer.class.getName()+"('ds=20100101');");
+      server.executeBatch();
+    }
+    catch(PigException fe){
+      PigException pe = LogUtils.getPigException(fe);
+      assertTrue(pe instanceof FrontendException);
+      assertEquals(PigHCatUtil.PIG_EXCEPTION_CODE, pe.getErrorCode());
+      assertTrue(pe.getMessage().contains("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer."));
+      errCaught = true;
+    }
+    assertTrue(errCaught);
+    errCaught = false;
+    try{
+      server.setBatchOn();
+      server.registerQuery("A = load '"+ fullFileName +"' as (a:int, B:chararray);");
+      server.registerQuery("B = foreach A generate a, B;");
+      server.registerQuery("store B into 'junit_parted' using "+HCatStorer.class.getName()+"('ds=20100101');");
+      server.executeBatch();
+    }
+    catch(PigException fe){
+      PigException pe = LogUtils.getPigException(fe);
+      assertTrue(pe instanceof FrontendException);
+      assertEquals(PigHCatUtil.PIG_EXCEPTION_CODE, pe.getErrorCode());
+      assertTrue(pe.getMessage().contains("Column names should all be in lowercase. Invalid name found: B"));
+      errCaught = true;
+    }
+    driver.run("drop table junit_parted");
+    assertTrue(errCaught);
+  }
+
+  public void testStoreMultiTables() throws IOException{
+
+    driver.run("drop table junit_unparted");
+    String createTable = "create table junit_unparted(a int, b string) stored as RCFILE " +
+        "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+        "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+    driver.run("drop table junit_unparted2");
+    createTable = "create table junit_unparted2(a int, b string) stored as RCFILE " +
+    "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+    "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+    retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+
+    MiniCluster.deleteFile(cluster, fileName);
+    int LOOP_SIZE = 3;
+    String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+    int k = 0;
+    for(int i = 1; i <= LOOP_SIZE; i++) {
+      String si = i + "";
+      for(int j=1;j<=LOOP_SIZE;j++) {
+        input[k++] = si + "\t"+j;
+      }
+    }
+    MiniCluster.createInputFile(cluster, fileName, input);
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '"+ fullFileName +"' as (a:int, b:chararray);");
+    server.registerQuery("B = filter A by a < 2;");
+    server.registerQuery("store B into 'junit_unparted' using "+HCatStorer.class.getName()+"();");
+    server.registerQuery("C = filter A by a >= 2;");
+    server.registerQuery("store C into 'junit_unparted2' using "+HCatStorer.class.getName()+"();");
+    server.executeBatch();
+    MiniCluster.deleteFile(cluster, fileName);
+
+    driver.run("select * from junit_unparted");
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+    driver.run("select * from junit_unparted2");
+    ArrayList<String> res2 = new ArrayList<String>();
+    driver.getResults(res2);
+
+    res.addAll(res2);
+    driver.run("drop table junit_unparted");
+    driver.run("drop table junit_unparted2");
+
+    Iterator<String> itr = res.iterator();
+    for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) {
+      assertEquals( input[i] ,itr.next());
+    }
+
+    assertFalse(itr.hasNext());
+
+  }
+
+  public void testStoreWithNoSchema() throws IOException{
+
+    driver.run("drop table junit_unparted");
+    String createTable = "create table junit_unparted(a int, b string) stored as RCFILE " +
+        "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+        "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+    MiniCluster.deleteFile(cluster, fileName);
+    int LOOP_SIZE = 3;
+    String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+    int k = 0;
+    for(int i = 1; i <= LOOP_SIZE; i++) {
+      String si = i + "";
+      for(int j=1;j<=LOOP_SIZE;j++) {
+        input[k++] = si + "\t"+j;
+      }
+    }
+    MiniCluster.createInputFile(cluster, fileName, input);
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '"+ fullFileName +"' as (a:int, b:chararray);");
+    server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('');");
+    server.executeBatch();
+    MiniCluster.deleteFile(cluster, fileName);
+
+    driver.run("select * from junit_unparted");
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+    driver.run("drop table junit_unparted");
+    Iterator<String> itr = res.iterator();
+    for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) {
+      assertEquals( input[i] ,itr.next());
+    }
+
+    assertFalse(itr.hasNext());
+
+  }
+
+  public void testStoreWithNoCtorArgs() throws IOException{
+
+    driver.run("drop table junit_unparted");
+    String createTable = "create table junit_unparted(a int, b string) stored as RCFILE " +
+        "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+        "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+    MiniCluster.deleteFile(cluster, fileName);
+    int LOOP_SIZE = 3;
+    String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+    int k = 0;
+    for(int i = 1; i <= LOOP_SIZE; i++) {
+      String si = i + "";
+      for(int j=1;j<=LOOP_SIZE;j++) {
+        input[k++] = si + "\t"+j;
+      }
+    }
+    MiniCluster.createInputFile(cluster, fileName, input);
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '"+ fullFileName +"' as (a:int, b:chararray);");
+    server.registerQuery("store A into 'junit_unparted' using "+HCatStorer.class.getName()+"();");
+    server.executeBatch();
+    MiniCluster.deleteFile(cluster, fileName);
+
+    driver.run("select * from junit_unparted");
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+    driver.run("drop table junit_unparted");
+    Iterator<String> itr = res.iterator();
+    for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) {
+      assertEquals( input[i] ,itr.next());
+    }
+
+    assertFalse(itr.hasNext());
+
+  }
+
+  public void testEmptyStore() throws IOException{
+
+    driver.run("drop table junit_unparted");
+    String createTable = "create table junit_unparted(a int, b string) stored as RCFILE " +
+        "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+        "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+    MiniCluster.deleteFile(cluster, fileName);
+    int LOOP_SIZE = 3;
+    String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+    int k = 0;
+    for(int i = 1; i <= LOOP_SIZE; i++) {
+      String si = i + "";
+      for(int j=1;j<=LOOP_SIZE;j++) {
+        input[k++] = si + "\t"+j;
+      }
+    }
+    MiniCluster.createInputFile(cluster, fileName, input);
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '"+fullFileName+"' as (a:int, b:chararray);");
+    server.registerQuery("B = filter A by a > 100;");
+    server.registerQuery("store B into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('','a:int,b:chararray');");
+    server.executeBatch();
+    MiniCluster.deleteFile(cluster, fileName);
+
+    driver.run("select * from junit_unparted");
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+    driver.run("drop table junit_unparted");
+    Iterator<String> itr = res.iterator();
+    assertFalse(itr.hasNext());
+
+  }
+
+  public void testBagNStruct() throws IOException{
+  driver.run("drop table junit_unparted");
+  String createTable = "create table junit_unparted(b string,a struct<a1:int>,  arr_of_struct array<string>, " +
+  		"arr_of_struct2 array<struct<s1:string,s2:string>>,  arr_of_struct3 array<struct<s3:string>>) stored as RCFILE " +
+      "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+      "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+  int retCode = driver.run(createTable).getResponseCode();
+  if(retCode != 0) {
+    throw new IOException("Failed to create table.");
+  }
+
+  MiniCluster.deleteFile(cluster, fileName);
+  MiniCluster.createInputFile(cluster, fileName, new String[]{"zookeeper\t(2)\t{(pig)}\t{(pnuts,hdfs)}\t{(hadoop),(howl)}",
+      "chubby\t(2)\t{(sawzall)}\t{(bigtable,gfs)}\t{(mapreduce),(howl)}"});
+
+  PigServer server = new PigServer(ExecType.LOCAL, props);
+  UDFContext.getUDFContext().setClientSystemProps();
+  server.setBatchOn();
+  server.registerQuery("A = load '"+fullFileName+"' as (b:chararray, a:tuple(a1:int), arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)});");
+  server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('','b:chararray, a:tuple(a1:int)," +
+  		" arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)}');");
+  server.executeBatch();
+
+
+
+  MiniCluster.deleteFile(cluster, fileName);
+
+  driver.run("select * from junit_unparted");
+  ArrayList<String> res = new ArrayList<String>();
+  driver.getResults(res);
+  driver.run("drop table junit_unparted");
+  Iterator<String> itr = res.iterator();
+  assertEquals("zookeeper\t{\"a1\":2}\t[\"pig\"]\t[{\"s1\":\"pnuts\",\"s2\":\"hdfs\"}]\t[{\"s3\":\"hadoop\"},{\"s3\":\"howl\"}]", itr.next());
+  assertEquals("chubby\t{\"a1\":2}\t[\"sawzall\"]\t[{\"s1\":\"bigtable\",\"s2\":\"gfs\"}]\t[{\"s3\":\"mapreduce\"},{\"s3\":\"howl\"}]",itr.next());
+ assertFalse(itr.hasNext());
+
+  }
+
+  public void testStoreFuncAllSimpleTypes() throws IOException{
+
+    driver.run("drop table junit_unparted");
+    String createTable = "create table junit_unparted(a int, b float, c double, d bigint, e string) stored as RCFILE " +
+        "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+        "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+    MiniCluster.deleteFile(cluster, fileName);
+    int LOOP_SIZE = 3;
+    String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+    for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) {
+      input[i] = i + "\t" + i * 2.1f +"\t"+ i*1.1d + "\t" + i * 2L +"\t"+"lets howl";
+    }
+
+    MiniCluster.createInputFile(cluster, fileName, input);
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '"+fullFileName+"' as (a:int, b:float, c:double, d:long, e:chararray);");
+    server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('','a:int, b:float, c:double, d:long, e:chararray');");
+    server.executeBatch();
+    MiniCluster.deleteFile(cluster, fileName);
+
+    driver.run("select * from junit_unparted");
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+
+    Iterator<String> itr = res.iterator();
+    for(int i = 0; i < LOOP_SIZE*LOOP_SIZE; i++) {
+      assertEquals( input[i] ,itr.next());
+    }
+
+    assertFalse(itr.hasNext());
+    driver.run("drop table junit_unparted");
+  }
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    MiniCluster.deleteFile(cluster, fileName);
+  }
+
+
+
+
+  public void testStoreFuncSimple() throws IOException{
+
+    driver.run("drop table junit_unparted");
+    String createTable = "create table junit_unparted(a int, b string) stored as RCFILE " +
+    		"tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," +
+    		"'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') ";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+    MiniCluster.deleteFile(cluster, fileName);
+    int LOOP_SIZE = 3;
+    String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+    int k = 0;
+    for(int i = 1; i <= LOOP_SIZE; i++) {
+      String si = i + "";
+      for(int j=1;j<=LOOP_SIZE;j++) {
+        input[k++] = si + "\t"+j;
+      }
+    }
+    MiniCluster.createInputFile(cluster, fileName, input);
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '"+fullFileName+"' as (a:int, b:chararray);");
+    server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('','a:int,b:chararray');");
+    server.executeBatch();
+    MiniCluster.deleteFile(cluster, fileName);
+
+    driver.run("select * from junit_unparted");
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+    driver.run("drop table junit_unparted");
+    Iterator<String> itr = res.iterator();
+    for(int i = 1; i <= LOOP_SIZE; i++) {
+      String si = i + "";
+      for(int j=1;j<=LOOP_SIZE;j++) {
+        assertEquals( si + "\t"+j,itr.next());
+      }
+    }
+   assertFalse(itr.hasNext());
+
+  }
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorerMulti.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorerMulti.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorerMulti.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorerMulti.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.data.Pair;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.util.UDFContext;
+
+public class TestHCatStorerMulti extends TestCase {
+
+  private static final String BASIC_TABLE = "junit_unparted_basic";
+  private static final String PARTITIONED_TABLE = "junit_parted_basic";
+  private static MiniCluster cluster = MiniCluster.buildCluster();
+  private static Driver driver;
+
+  private static final String basicFile = "/tmp/basic.input.data";
+  private static String basicFileFullName;
+  private static Properties props;
+
+  private static Map<Integer,Pair<Integer,String>> basicInputData;
+
+  private void dropTable(String tablename) throws IOException{
+    driver.run("drop table "+tablename);
+  }
+  private void createTable(String tablename, String schema, String partitionedBy) throws IOException{
+    String createTable;
+    createTable = "create table "+tablename+"("+schema+") ";
+    if ((partitionedBy != null)&&(!partitionedBy.trim().isEmpty())){
+      createTable = createTable + "partitioned by ("+partitionedBy+") ";
+    }
+    createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
+    "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table. ["+createTable+"], return code from hive driver : ["+retCode+"]");
+    }
+  }
+
+  private void createTable(String tablename, String schema) throws IOException{
+    createTable(tablename,schema,null);
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    if (driver == null){
+      HiveConf hiveConf = new HiveConf(this.getClass());
+      hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+      hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+      hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+      driver = new Driver(hiveConf);
+      SessionState.start(new CliSessionState(hiveConf));
+    }
+
+    props = new Properties();
+    props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+    basicFileFullName = cluster.getProperties().getProperty("fs.default.name") + basicFile;
+
+    cleanup();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    cleanup();
+  }
+
+  public void testStoreBasicTable() throws Exception {
+
+
+    createTable(BASIC_TABLE,"a int, b string");
+
+    populateBasicFile();
+
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '"+basicFileFullName+"' as (a:int, b:chararray);");
+    server.registerQuery("store A into '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+
+    server.executeBatch();
+
+    driver.run("select * from "+BASIC_TABLE);
+    ArrayList<String> unpartitionedTableValuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(unpartitionedTableValuesReadFromHiveDriver);
+    assertEquals(basicInputData.size(),unpartitionedTableValuesReadFromHiveDriver.size());
+  }
+
+  public void testStorePartitionedTable() throws Exception {
+    createTable(PARTITIONED_TABLE,"a int, b string","bkt string");
+
+    populateBasicFile();
+
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '"+basicFileFullName+"' as (a:int, b:chararray);");
+
+    server.registerQuery("B2 = filter A by a < 2;");
+    server.registerQuery("store B2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
+    server.registerQuery("C2 = filter A by a >= 2;");
+    server.registerQuery("store C2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=1');");
+
+    server.executeBatch();
+
+    driver.run("select * from "+PARTITIONED_TABLE);
+    ArrayList<String> partitionedTableValuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(partitionedTableValuesReadFromHiveDriver);
+    assertEquals(basicInputData.size(),partitionedTableValuesReadFromHiveDriver.size());
+  }
+
+  public void testStoreTableMulti() throws Exception {
+
+
+    createTable(BASIC_TABLE,"a int, b string");
+    createTable(PARTITIONED_TABLE,"a int, b string","bkt string");
+
+    populateBasicFile();
+
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '"+basicFileFullName+"' as (a:int, b:chararray);");
+    server.registerQuery("store A into '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+
+    server.registerQuery("B2 = filter A by a < 2;");
+    server.registerQuery("store B2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
+    server.registerQuery("C2 = filter A by a >= 2;");
+    server.registerQuery("store C2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=1');");
+
+    server.executeBatch();
+
+    driver.run("select * from "+BASIC_TABLE);
+    ArrayList<String> unpartitionedTableValuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(unpartitionedTableValuesReadFromHiveDriver);
+    driver.run("select * from "+PARTITIONED_TABLE);
+    ArrayList<String> partitionedTableValuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(partitionedTableValuesReadFromHiveDriver);
+    assertEquals(basicInputData.size(),unpartitionedTableValuesReadFromHiveDriver.size());
+    assertEquals(basicInputData.size(),partitionedTableValuesReadFromHiveDriver.size());
+  }
+
+  private void populateBasicFile() throws IOException {
+    int LOOP_SIZE = 3;
+    String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+    basicInputData = new HashMap<Integer,Pair<Integer,String>>();
+    int k = 0;
+    for(int i = 1; i <= LOOP_SIZE; i++) {
+      String si = i + "";
+      for(int j=1;j<=LOOP_SIZE;j++) {
+        String sj = "S"+j+"S";
+        input[k] = si + "\t" + sj;
+        basicInputData.put(k, new Pair<Integer,String>(i,sj));
+        k++;
+      }
+    }
+    MiniCluster.createInputFile(cluster, basicFile, input);
+  }
+
+  private void cleanup() throws IOException {
+    MiniCluster.deleteFile(cluster, basicFile);
+    dropTable(BASIC_TABLE);
+    dropTable(PARTITIONED_TABLE);
+  }
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPermsInheritance.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPermsInheritance.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPermsInheritance.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPermsInheritance.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hcatalog.ExitException;
+import org.apache.hcatalog.NoExitSecurityManager;
+import org.apache.hcatalog.cli.HCatCli;
+import org.apache.hcatalog.pig.HCatStorer;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.thrift.TException;
+
+public class TestPermsInheritance extends TestCase {
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    securityManager = System.getSecurityManager();
+    System.setSecurityManager(new NoExitSecurityManager());
+    msc = new HiveMetaStoreClient(conf);
+    msc.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME,"testNoPartTbl", true,true);
+    System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+    System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+    msc.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME,"testPartTbl", true,true);
+    pig = new PigServer(ExecType.LOCAL, conf.getAllProperties());
+    UDFContext.getUDFContext().setClientSystemProps();
+  }
+
+  private HiveMetaStoreClient msc;
+  private SecurityManager securityManager;
+  private PigServer pig;
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    System.setSecurityManager(securityManager);
+  }
+
+  private final HiveConf conf = new HiveConf(this.getClass());
+
+  public void testNoPartTbl() throws IOException, MetaException, UnknownTableException, TException, NoSuchObjectException{
+
+    try{
+      HCatCli.main(new String[]{"-e","create table testNoPartTbl (line string) stored as RCFILE", "-p","rwx-wx---"});
+    }
+    catch(Exception e){
+      assertTrue(e instanceof ExitException);
+      assertEquals(((ExitException)e).getStatus(), 0);
+    }
+    Warehouse wh = new Warehouse(conf);
+    Path dfsPath = wh.getDefaultTablePath(MetaStoreUtils.DEFAULT_DATABASE_NAME, "testNoPartTbl");
+    FileSystem fs = dfsPath.getFileSystem(conf);
+    assertEquals(fs.getFileStatus(dfsPath).getPermission(),FsPermission.valueOf("drwx-wx---"));
+
+    pig.setBatchOn();
+    pig.registerQuery("A  = load 'build.xml' as (line:chararray);");
+    pig.registerQuery("store A into 'testNoPartTbl' using "+HCatStorer.class.getName()+"();");
+    pig.executeBatch();
+    FileStatus[] status = fs.listStatus(dfsPath,hiddenFileFilter);
+
+    assertEquals(status.length, 1);
+    assertEquals(FsPermission.valueOf("drwx-wx---"),status[0].getPermission());
+
+    try{
+      HCatCli.main(new String[]{"-e","create table testPartTbl (line string)  partitioned by (a string) stored as RCFILE", "-p","rwx-wx--x"});
+    }
+    catch(Exception e){
+      assertTrue(e instanceof ExitException);
+      assertEquals(((ExitException)e).getStatus(), 0);
+    }
+
+    dfsPath = wh.getDefaultTablePath(MetaStoreUtils.DEFAULT_DATABASE_NAME, "testPartTbl");
+    assertEquals(fs.getFileStatus(dfsPath).getPermission(),FsPermission.valueOf("drwx-wx--x"));
+
+    pig.setBatchOn();
+    pig.registerQuery("A  = load 'build.xml' as (line:chararray);");
+    pig.registerQuery("store A into 'testPartTbl' using "+HCatStorer.class.getName()+"('a=part');");
+    pig.executeBatch();
+
+    Path partPath = new Path(dfsPath,"a=part");
+    assertEquals(FsPermission.valueOf("drwx-wx--x"),fs.getFileStatus(partPath).getPermission());
+    status = fs.listStatus(partPath,hiddenFileFilter);
+    assertEquals(status.length, 1);
+    assertEquals(FsPermission.valueOf("drwx-wx--x"),status[0].getPermission());
+  }
+
+  private static final PathFilter hiddenFileFilter = new PathFilter(){
+    public boolean accept(Path p){
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.pig.HCatLoader;
+import org.apache.hcatalog.pig.drivers.PigStorageInputDriver;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.thrift.TException;
+
+public class TestPigStorageDriver extends TestCase {
+
+  private HiveConf howlConf;
+  private Driver howlDriver;
+  private HiveMetaStoreClient msc;
+
+  @Override
+  protected void setUp() throws Exception {
+
+    howlConf = new HiveConf(this.getClass());
+    howlConf.set(ConfVars.PREEXECHOOKS.varname, "");
+    howlConf.set(ConfVars.POSTEXECHOOKS.varname, "");
+    howlConf.set(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+    howlConf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName());
+    howlDriver = new Driver(howlConf);
+    msc = new HiveMetaStoreClient(howlConf);
+    SessionState.start(new CliSessionState(howlConf));
+    super.setUp();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  public void testPigStorageDriver() throws IOException{
+
+
+    String fsLoc = howlConf.get("fs.default.name");
+    Path tblPath = new Path(fsLoc, "/tmp/test_pig/data");
+    String anyExistingFileInCurDir = "ivy.xml";
+    tblPath.getFileSystem(howlConf).copyFromLocalFile(new Path(anyExistingFileInCurDir),tblPath);
+
+    howlDriver.run("drop table junit_pigstorage");
+    CommandProcessorResponse resp;
+    String createTable = "create table junit_pigstorage (a string) partitioned by (b string) stored as RCFILE";
+
+    resp = howlDriver.run(createTable);
+    assertEquals(0, resp.getResponseCode());
+    assertNull(resp.getErrorMessage());
+
+    resp = howlDriver.run("alter table junit_pigstorage add partition (b='2010-10-10') location '"+new Path(fsLoc, "/tmp/test_pig")+"'");
+    assertEquals(0, resp.getResponseCode());
+    assertNull(resp.getErrorMessage());
+
+    resp = howlDriver.run("alter table junit_pigstorage partition (b='2010-10-10') set fileformat inputformat '" + RCFileInputFormat.class.getName()
+        +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+PigStorageInputDriver.class.getName()+"' outputdriver 'non-existent'");
+    assertEquals(0, resp.getResponseCode());
+    assertNull(resp.getErrorMessage());
+
+    resp =  howlDriver.run("desc extended junit_pigstorage partition (b='2010-10-10')");
+    assertEquals(0, resp.getResponseCode());
+    assertNull(resp.getErrorMessage());
+
+    PigServer server = new PigServer(ExecType.LOCAL, howlConf.getAllProperties());
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.registerQuery(" a = load 'junit_pigstorage' using "+HCatLoader.class.getName()+";");
+    Iterator<Tuple> itr = server.openIterator("a");
+    DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(new File(anyExistingFileInCurDir))));
+    while(itr.hasNext()){
+      Tuple t = itr.next();
+      assertEquals(2, t.size());
+      if(t.get(0) != null) {
+        // If underlying data-field is empty. PigStorage inserts null instead
+        // of empty String objects.
+        assertTrue(t.get(0) instanceof String);
+        assertEquals(stream.readLine(), t.get(0));
+      }
+      else{
+        assertTrue(stream.readLine().isEmpty());
+      }
+      assertTrue(t.get(1) instanceof String);
+
+      assertEquals("2010-10-10", t.get(1));
+    }
+    assertEquals(0,stream.available());
+    stream.close();
+    howlDriver.run("drop table junit_pigstorage");
+  }
+
+  public void testDelim() throws MetaException, TException, UnknownTableException, NoSuchObjectException, InvalidOperationException, IOException{
+
+    howlDriver.run("drop table junit_pigstorage_delim");
+
+    CommandProcessorResponse resp;
+    String createTable = "create table junit_pigstorage_delim (a string) partitioned by (b string) stored as RCFILE";
+
+    resp = howlDriver.run(createTable);
+
+    assertEquals(0, resp.getResponseCode());
+    assertNull(resp.getErrorMessage());
+
+    resp = howlDriver.run("alter table junit_pigstorage_delim add partition (b='2010-10-10')");
+    assertEquals(0, resp.getResponseCode());
+    assertNull(resp.getErrorMessage());
+
+    resp = howlDriver.run("alter table junit_pigstorage_delim partition (b='2010-10-10') set fileformat inputformat '" + RCFileInputFormat.class.getName()
+        +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+MyPigStorageDriver.class.getName()+"' outputdriver 'non-existent'");
+
+    Partition part = msc.getPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", "b=2010-10-10");
+    Map<String,String> partParms = part.getParameters();
+    partParms.put(PigStorageInputDriver.delim, "control-A");
+
+    msc.alter_partition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", part);
+
+    PigServer server = new PigServer(ExecType.LOCAL, howlConf.getAllProperties());
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.registerQuery(" a = load 'junit_pigstorage_delim' using "+HCatLoader.class.getName()+";");
+    try{
+      server.openIterator("a");
+    }catch(FrontendException fe){}
+  }
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.rcfile;import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.rcfile.RCFileInputDriver;
+;
+
+public class TestRCFileInputStorageDriver extends TestCase{
+
+  private static Configuration conf = new Configuration();
+
+  private static Path file;
+
+  private static FileSystem fs;
+
+   static {
+
+    try {
+      fs = FileSystem.getLocal(conf);
+      Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred");
+      file = new Path(dir, "test_rcfile");
+      fs.delete(dir, true);
+    } catch (Exception e) {
+    }
+  }
+
+  public void testConvertValueToTuple() throws IOException,InterruptedException{
+    fs.delete(file, true);
+
+    byte[][] record_1 = {"123".getBytes("UTF-8"), "456".getBytes("UTF-8"),
+        "789".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+        "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+        new byte[0], "\\N".getBytes("UTF-8")};
+    byte[][] record_2 = {"100".getBytes("UTF-8"), "200".getBytes("UTF-8"),
+        "123".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+        "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+        new byte[0], "\\N".getBytes("UTF-8")};
+
+    RCFileOutputFormat.setColumnNumber(conf, 8);
+    RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null,
+        new DefaultCodec());
+    BytesRefArrayWritable bytes = new BytesRefArrayWritable(record_1.length);
+    for (int i = 0; i < record_1.length; i++) {
+      BytesRefWritable cu = new BytesRefWritable(record_1[i], 0,
+          record_1[i].length);
+      bytes.set(i, cu);
+    }
+    writer.append(bytes);
+    BytesRefArrayWritable bytes2 = new BytesRefArrayWritable(record_2.length);
+    for (int i = 0; i < record_2.length; i++) {
+      BytesRefWritable cu = new BytesRefWritable(record_2[i], 0,
+          record_2[i].length);
+      bytes2.set(i, cu);
+    }
+    writer.append(bytes2);
+    writer.close();
+    BytesRefArrayWritable[] bytesArr = new BytesRefArrayWritable[]{bytes,bytes2};
+
+    HCatSchema schema = buildHiveSchema();
+    RCFileInputDriver sd = new RCFileInputDriver();
+    JobContext jc = new JobContext(conf, new JobID());
+    sd.setInputPath(jc, file.toString());
+    InputFormat<?,?> iF = sd.getInputFormat(null);
+    InputSplit split = iF.getSplits(jc).get(0);
+    sd.setOriginalSchema(jc, schema);
+    sd.setOutputSchema(jc, schema);
+    sd.initialize(jc, getProps());
+
+    TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID());
+    RecordReader<?,?> rr = iF.createRecordReader(split,tac);
+    rr.initialize(split, tac);
+    HCatRecord[] tuples = getExpectedRecords();
+    for(int j=0; j < 2; j++){
+      Assert.assertTrue(rr.nextKeyValue());
+      BytesRefArrayWritable w = (BytesRefArrayWritable)rr.getCurrentValue();
+      Assert.assertEquals(bytesArr[j], w);
+      HCatRecord t = sd.convertToHCatRecord(null,w);
+      Assert.assertEquals(8, t.size());
+      Assert.assertEquals(t,tuples[j]);
+    }
+  }
+
+  public void testPruning() throws IOException,InterruptedException{
+    fs.delete(file, true);
+
+    byte[][] record_1 = {"123".getBytes("UTF-8"), "456".getBytes("UTF-8"),
+        "789".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+        "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+        new byte[0], "\\N".getBytes("UTF-8")};
+    byte[][] record_2 = {"100".getBytes("UTF-8"), "200".getBytes("UTF-8"),
+        "123".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+        "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+        new byte[0], "\\N".getBytes("UTF-8")};
+
+    RCFileOutputFormat.setColumnNumber(conf, 8);
+    RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null,
+        new DefaultCodec());
+    BytesRefArrayWritable bytes = new BytesRefArrayWritable(record_1.length);
+    for (int i = 0; i < record_1.length; i++) {
+      BytesRefWritable cu = new BytesRefWritable(record_1[i], 0,
+          record_1[i].length);
+      bytes.set(i, cu);
+    }
+    writer.append(bytes);
+    BytesRefArrayWritable bytes2 = new BytesRefArrayWritable(record_2.length);
+    for (int i = 0; i < record_2.length; i++) {
+      BytesRefWritable cu = new BytesRefWritable(record_2[i], 0,
+          record_2[i].length);
+      bytes2.set(i, cu);
+    }
+    writer.append(bytes2);
+    writer.close();
+    BytesRefArrayWritable[] bytesArr = new BytesRefArrayWritable[]{bytes,bytes2};
+
+    RCFileInputDriver sd = new RCFileInputDriver();
+    JobContext jc = new JobContext(conf, new JobID());
+    sd.setInputPath(jc, file.toString());
+    InputFormat<?,?> iF = sd.getInputFormat(null);
+    InputSplit split = iF.getSplits(jc).get(0);
+    sd.setOriginalSchema(jc, buildHiveSchema());
+    sd.setOutputSchema(jc, buildPrunedSchema());
+
+    sd.initialize(jc, getProps());
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,jc.getConfiguration().get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+    TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID());
+    RecordReader<?,?> rr = iF.createRecordReader(split,tac);
+    rr.initialize(split, tac);
+    HCatRecord[] tuples = getPrunedRecords();
+    for(int j=0; j < 2; j++){
+      Assert.assertTrue(rr.nextKeyValue());
+      BytesRefArrayWritable w = (BytesRefArrayWritable)rr.getCurrentValue();
+      Assert.assertFalse(bytesArr[j].equals(w));
+      Assert.assertEquals(w.size(), 8);
+      HCatRecord t = sd.convertToHCatRecord(null,w);
+      Assert.assertEquals(5, t.size());
+      Assert.assertEquals(t,tuples[j]);
+    }
+    assertFalse(rr.nextKeyValue());
+  }
+
+  public void testReorderdCols() throws IOException,InterruptedException{
+    fs.delete(file, true);
+
+    byte[][] record_1 = {"123".getBytes("UTF-8"), "456".getBytes("UTF-8"),
+        "789".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+        "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+        new byte[0], "\\N".getBytes("UTF-8")};
+    byte[][] record_2 = {"100".getBytes("UTF-8"), "200".getBytes("UTF-8"),
+        "123".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+        "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+        new byte[0], "\\N".getBytes("UTF-8")};
+
+    RCFileOutputFormat.setColumnNumber(conf, 8);
+    RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null,
+        new DefaultCodec());
+    BytesRefArrayWritable bytes = new BytesRefArrayWritable(record_1.length);
+    for (int i = 0; i < record_1.length; i++) {
+      BytesRefWritable cu = new BytesRefWritable(record_1[i], 0,
+          record_1[i].length);
+      bytes.set(i, cu);
+    }
+    writer.append(bytes);
+    BytesRefArrayWritable bytes2 = new BytesRefArrayWritable(record_2.length);
+    for (int i = 0; i < record_2.length; i++) {
+      BytesRefWritable cu = new BytesRefWritable(record_2[i], 0,
+          record_2[i].length);
+      bytes2.set(i, cu);
+    }
+    writer.append(bytes2);
+    writer.close();
+    BytesRefArrayWritable[] bytesArr = new BytesRefArrayWritable[]{bytes,bytes2};
+
+    RCFileInputDriver sd = new RCFileInputDriver();
+    JobContext jc = new JobContext(conf, new JobID());
+    sd.setInputPath(jc, file.toString());
+    InputFormat<?,?> iF = sd.getInputFormat(null);
+    InputSplit split = iF.getSplits(jc).get(0);
+    sd.setOriginalSchema(jc, buildHiveSchema());
+    sd.setOutputSchema(jc, buildReorderedSchema());
+
+    sd.initialize(jc, getProps());
+    Map<String,String> map = new HashMap<String,String>(1);
+    map.put("part1", "first-part");
+    sd.setPartitionValues(jc, map);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,jc.getConfiguration().get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+    TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID());
+    RecordReader<?,?> rr = iF.createRecordReader(split,tac);
+    rr.initialize(split, tac);
+    HCatRecord[] tuples = getReorderedCols();
+    for(int j=0; j < 2; j++){
+      Assert.assertTrue(rr.nextKeyValue());
+      BytesRefArrayWritable w = (BytesRefArrayWritable)rr.getCurrentValue();
+      Assert.assertFalse(bytesArr[j].equals(w));
+      Assert.assertEquals(w.size(), 8);
+      HCatRecord t = sd.convertToHCatRecord(null,w);
+      Assert.assertEquals(7, t.size());
+      Assert.assertEquals(t,tuples[j]);
+    }
+    assertFalse(rr.nextKeyValue());
+  }
+
+  private HCatRecord[] getExpectedRecords(){
+
+    List<Object> rec_1 = new ArrayList<Object>(8);
+    rec_1.add(new Byte("123"));
+    rec_1.add(new Short("456"));
+    rec_1.add( new Integer(789));
+    rec_1.add( new Long(1000L));
+    rec_1.add( new Double(5.3D));
+    rec_1.add( new String("howl and hadoop"));
+    rec_1.add( null);
+    rec_1.add( null);
+
+    HCatRecord tup_1 = new DefaultHCatRecord(rec_1);
+
+    List<Object> rec_2 = new ArrayList<Object>(8);
+    rec_2.add( new Byte("100"));
+    rec_2.add( new Short("200"));
+    rec_2.add( new Integer(123));
+    rec_2.add( new Long(1000L));
+    rec_2.add( new Double(5.3D));
+    rec_2.add( new String("howl and hadoop"));
+    rec_2.add( null);
+    rec_2.add( null);
+    HCatRecord tup_2 = new DefaultHCatRecord(rec_2);
+
+    return  new HCatRecord[]{tup_1,tup_2};
+
+  }
+
+  private HCatRecord[] getPrunedRecords(){
+
+    List<Object> rec_1 = new ArrayList<Object>(8);
+    rec_1.add(new Byte("123"));
+    rec_1.add( new Integer(789));
+    rec_1.add( new Double(5.3D));
+    rec_1.add( new String("howl and hadoop"));
+    rec_1.add( null);
+    HCatRecord tup_1 = new DefaultHCatRecord(rec_1);
+
+    List<Object> rec_2 = new ArrayList<Object>(8);
+    rec_2.add( new Byte("100"));
+    rec_2.add( new Integer(123));
+    rec_2.add( new Double(5.3D));
+    rec_2.add( new String("howl and hadoop"));
+    rec_2.add( null);
+    HCatRecord tup_2 = new DefaultHCatRecord(rec_2);
+
+    return  new HCatRecord[]{tup_1,tup_2};
+
+  }
+
+  private HCatSchema buildHiveSchema() throws HCatException{
+
+    List<FieldSchema> fields = new ArrayList<FieldSchema>(8);
+    fields.add(new FieldSchema("atinyint", "tinyint", ""));
+    fields.add(new FieldSchema("asmallint", "smallint", ""));
+    fields.add(new FieldSchema("aint", "int", ""));
+    fields.add(new FieldSchema("along", "bigint", ""));
+    fields.add(new FieldSchema("adouble", "double", ""));
+    fields.add(new FieldSchema("astring", "string", ""));
+    fields.add(new FieldSchema("anullint", "int", ""));
+    fields.add(new FieldSchema("anullstring", "string", ""));
+
+    return new HCatSchema(HCatUtil.getHCatFieldSchemaList(fields));
+  }
+
+  private HCatSchema buildPrunedSchema() throws HCatException{
+
+    List<FieldSchema> fields = new ArrayList<FieldSchema>(5);
+    fields.add(new FieldSchema("atinyint", "tinyint", ""));
+    fields.add(new FieldSchema("aint", "int", ""));
+    fields.add(new FieldSchema("adouble", "double", ""));
+    fields.add(new FieldSchema("astring", "string", ""));
+    fields.add(new FieldSchema("anullint", "int", ""));
+
+    return new HCatSchema(HCatUtil.getHCatFieldSchemaList(fields));
+  }
+
+  private HCatSchema buildReorderedSchema() throws HCatException{
+
+    List<FieldSchema> fields = new ArrayList<FieldSchema>(7);
+
+    fields.add(new FieldSchema("aint", "int", ""));
+    fields.add(new FieldSchema("part1", "string", ""));
+    fields.add(new FieldSchema("adouble", "double", ""));
+    fields.add(new FieldSchema("newCol", "tinyint", ""));
+    fields.add(new FieldSchema("astring", "string", ""));
+    fields.add(new FieldSchema("atinyint", "tinyint", ""));
+    fields.add(new FieldSchema("anullint", "int", ""));
+
+
+    return new HCatSchema(HCatUtil.getHCatFieldSchemaList(fields));
+  }
+
+  private HCatRecord[] getReorderedCols(){
+
+    List<Object> rec_1 = new ArrayList<Object>(7);
+    rec_1.add( new Integer(789));
+    rec_1.add( new String("first-part"));
+    rec_1.add( new Double(5.3D));
+    rec_1.add( null); // new column
+    rec_1.add( new String("howl and hadoop"));
+    rec_1.add( new Byte("123"));
+    rec_1.add( null);
+    HCatRecord tup_1 = new DefaultHCatRecord(rec_1);
+
+    List<Object> rec_2 = new ArrayList<Object>(7);
+
+    rec_2.add( new Integer(123));
+    rec_2.add( new String("first-part"));
+    rec_2.add( new Double(5.3D));
+    rec_2.add(null);
+    rec_2.add( new String("howl and hadoop"));
+    rec_2.add( new Byte("100"));
+    rec_2.add( null);
+    HCatRecord tup_2 = new DefaultHCatRecord(rec_2);
+
+    return  new HCatRecord[]{tup_1,tup_2};
+
+  }
+  private Properties getProps(){
+    Properties props = new Properties();
+    props.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "\\N");
+    props.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+    return props;
+  }
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,247 @@
+  /**
+   * Licensed to the Apache Software Foundation (ASF) under one
+   * or more contributor license agreements.  See the NOTICE file
+   * distributed with this work for additional information
+   * regarding copyright ownership.  The ASF licenses this file
+   * to you under the Apache License, Version 2.0 (the
+   * "License"); you may not use this file except in compliance
+   * with the License.  You may obtain a copy of the License at
+   *
+   *     http://www.apache.org/licenses/LICENSE-2.0
+   *
+   * Unless required by applicable law or agreed to in writing, software
+   * distributed under the License is distributed on an "AS IS" BASIS,
+   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   * See the License for the specific language governing permissions and
+   * limitations under the License.
+   */
+
+  package org.apache.hcatalog.rcfile;
+
+  import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.rcfile.RCFileMapReduceInputFormat;
+
+  /**
+   * TestRCFile.
+   *
+   */
+  public class TestRCFileMapReduceInputFormat extends TestCase {
+
+    private static final Log LOG = LogFactory.getLog(TestRCFileMapReduceInputFormat.class);
+
+    private static Configuration conf = new Configuration();
+
+    private static ColumnarSerDe serDe;
+
+    private static Path file;
+
+    private static FileSystem fs;
+
+    private static Properties tbl;
+
+    static {
+      try {
+        fs = FileSystem.getLocal(conf);
+        Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred");
+        file = new Path(dir, "test_rcfile");
+        fs.delete(dir, true);
+        // the SerDe part is from TestLazySimpleSerDe
+        serDe = new ColumnarSerDe();
+        // Create the SerDe
+        tbl = createProperties();
+        serDe.initialize(conf, tbl);
+      } catch (Exception e) {
+      }
+    }
+
+    private static BytesRefArrayWritable patialS = new BytesRefArrayWritable();
+
+    private static byte[][] bytesArray = null;
+
+    private static BytesRefArrayWritable s = null;
+    static {
+      try {
+        bytesArray = new byte[][] {"123".getBytes("UTF-8"),
+            "456".getBytes("UTF-8"), "789".getBytes("UTF-8"),
+            "1000".getBytes("UTF-8"), "5.3".getBytes("UTF-8"),
+            "hive and hadoop".getBytes("UTF-8"), new byte[0],
+            "NULL".getBytes("UTF-8")};
+        s = new BytesRefArrayWritable(bytesArray.length);
+        s.set(0, new BytesRefWritable("123".getBytes("UTF-8")));
+        s.set(1, new BytesRefWritable("456".getBytes("UTF-8")));
+        s.set(2, new BytesRefWritable("789".getBytes("UTF-8")));
+        s.set(3, new BytesRefWritable("1000".getBytes("UTF-8")));
+        s.set(4, new BytesRefWritable("5.3".getBytes("UTF-8")));
+        s.set(5, new BytesRefWritable("hive and hadoop".getBytes("UTF-8")));
+        s.set(6, new BytesRefWritable("NULL".getBytes("UTF-8")));
+        s.set(7, new BytesRefWritable("NULL".getBytes("UTF-8")));
+
+        // partial test init
+        patialS.set(0, new BytesRefWritable("NULL".getBytes("UTF-8")));
+        patialS.set(1, new BytesRefWritable("NULL".getBytes("UTF-8")));
+        patialS.set(2, new BytesRefWritable("789".getBytes("UTF-8")));
+        patialS.set(3, new BytesRefWritable("1000".getBytes("UTF-8")));
+        patialS.set(4, new BytesRefWritable("NULL".getBytes("UTF-8")));
+        patialS.set(5, new BytesRefWritable("NULL".getBytes("UTF-8")));
+        patialS.set(6, new BytesRefWritable("NULL".getBytes("UTF-8")));
+        patialS.set(7, new BytesRefWritable("NULL".getBytes("UTF-8")));
+
+      } catch (UnsupportedEncodingException e) {
+      }
+    }
+
+
+    /** For debugging and testing. */
+    public static void main(String[] args) throws Exception {
+      int count = 10000;
+      boolean create = true;
+
+      String usage = "Usage: RCFile " + "[-count N]" + " file";
+      if (args.length == 0) {
+        System.err.println(usage);
+        System.exit(-1);
+      }
+
+      try {
+        for (int i = 0; i < args.length; ++i) { // parse command line
+          if (args[i] == null) {
+            continue;
+          } else if (args[i].equals("-count")) {
+            count = Integer.parseInt(args[++i]);
+          } else {
+            // file is required parameter
+            file = new Path(args[i]);
+          }
+        }
+
+        if (file == null) {
+          System.err.println(usage);
+          System.exit(-1);
+        }
+
+        LOG.info("count = " + count);
+        LOG.info("create = " + create);
+        LOG.info("file = " + file);
+
+           // test.performanceTest();
+        System.out.println("Finished.");
+      } finally {
+        fs.close();
+      }
+    }
+
+    private static Properties createProperties() {
+      Properties tbl = new Properties();
+
+      // Set the configuration parameters
+      tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+      tbl.setProperty("columns",
+          "abyte,ashort,aint,along,adouble,astring,anullint,anullstring");
+      tbl.setProperty("columns.types",
+          "tinyint:smallint:int:bigint:double:string:int:string");
+      tbl.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
+      return tbl;
+    }
+
+
+
+    public void testSynAndSplit() throws IOException, InterruptedException {
+      splitBeforeSync();
+      splitRightBeforeSync();
+      splitInMiddleOfSync();
+      splitRightAfterSync();
+      splitAfterSync();
+    }
+
+    private void splitBeforeSync() throws IOException,InterruptedException {
+      writeThenReadByRecordReader(600, 1000, 2, 17684, null);
+    }
+
+    private void splitRightBeforeSync() throws IOException ,InterruptedException{
+      writeThenReadByRecordReader(500, 1000, 2, 17750, null);
+    }
+
+    private void splitInMiddleOfSync() throws IOException,InterruptedException {
+      writeThenReadByRecordReader(500, 1000, 2, 17760, null);
+
+    }
+
+    private void splitRightAfterSync() throws IOException, InterruptedException {
+      writeThenReadByRecordReader(500, 1000, 2, 17770, null);
+    }
+
+    private void splitAfterSync() throws IOException ,InterruptedException{
+      writeThenReadByRecordReader(500, 1000, 2, 19950, null);
+    }
+
+    private void writeThenReadByRecordReader(int intervalRecordCount,
+        int writeCount, int splitNumber, long maxSplitSize, CompressionCodec codec)
+        throws IOException, InterruptedException {
+      Path testDir = new Path(System.getProperty("test.data.dir", ".")
+          + "/mapred/testsmallfirstsplit");
+      Path testFile = new Path(testDir, "test_rcfile");
+      fs.delete(testFile, true);
+      Configuration cloneConf = new Configuration(conf);
+      RCFileOutputFormat.setColumnNumber(cloneConf, bytesArray.length);
+      cloneConf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, intervalRecordCount);
+
+      RCFile.Writer writer = new RCFile.Writer(fs, cloneConf, testFile, null, codec);
+
+      BytesRefArrayWritable bytes = new BytesRefArrayWritable(bytesArray.length);
+      for (int i = 0; i < bytesArray.length; i++) {
+        BytesRefWritable cu = null;
+        cu = new BytesRefWritable(bytesArray[i], 0, bytesArray[i].length);
+        bytes.set(i, cu);
+      }
+      for (int i = 0; i < writeCount; i++) {
+        writer.append(bytes);
+      }
+      writer.close();
+
+      RCFileMapReduceInputFormat<LongWritable, BytesRefArrayWritable> inputFormat = new RCFileMapReduceInputFormat<LongWritable, BytesRefArrayWritable>();
+      Configuration jonconf = new Configuration(cloneConf);
+      jonconf.set("mapred.input.dir", testDir.toString());
+      JobContext context = new Job(jonconf);
+      context.getConfiguration().setLong("mapred.max.split.size",maxSplitSize);
+      List<InputSplit> splits = inputFormat.getSplits(context);
+      assertEquals("splits length should be " + splitNumber, splits.size(), splitNumber);
+      int readCount = 0;
+      for (int i = 0; i < splits.size(); i++) {
+        TaskAttemptContext tac = new TaskAttemptContext(jonconf, new TaskAttemptID());
+        RecordReader<LongWritable, BytesRefArrayWritable> rr = inputFormat.createRecordReader(splits.get(i), tac);
+        rr.initialize(splits.get(i), tac);
+        while (rr.nextKeyValue()) {
+          readCount++;
+        }
+      }
+      assertEquals("readCount should be equal to writeCount", readCount, writeCount);
+    }
+
+  }
+
+

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hcatalog.rcfile;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+import org.apache.hcatalog.rcfile.RCFileInputDriver;
+import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+
+public class TestRCFileOutputStorageDriver extends TestCase {
+
+  public void testConversion() throws IOException {
+    Configuration conf = new Configuration();
+    JobContext jc = new JobContext(conf, new JobID());
+
+    HCatSchema schema = buildHiveSchema();
+    HCatInputStorageDriver isd = new RCFileInputDriver();
+
+    isd.setOriginalSchema(jc, schema);
+    isd.setOutputSchema(jc, schema);
+    isd.initialize(jc, new Properties());
+
+    byte[][] byteArray = buildBytesArray();
+
+    BytesRefArrayWritable bytesWritable = new BytesRefArrayWritable(byteArray.length);
+    for (int i = 0; i < byteArray.length; i++) {
+      BytesRefWritable cu = new BytesRefWritable(byteArray[i], 0, byteArray[i].length);
+      bytesWritable.set(i, cu);
+    }
+
+    //Convert byte array to HowlRecord using isd, convert howlrecord back to byte array
+    //using osd, compare the two arrays
+    HCatRecord record = isd.convertToHCatRecord(null, bytesWritable);
+
+    HCatOutputStorageDriver osd = new RCFileOutputDriver();
+
+    osd.setSchema(jc, schema);
+    osd.initialize(jc, new Properties());
+
+    BytesRefArrayWritable bytesWritableOutput = (BytesRefArrayWritable) osd.convertValue(record);
+
+    assertTrue(bytesWritableOutput.compareTo(bytesWritable) == 0);
+  }
+
+  private byte[][] buildBytesArray() throws UnsupportedEncodingException {
+    byte[][] bytes = {"123".getBytes("UTF-8"), "456".getBytes("UTF-8"),
+        "789".getBytes("UTF-8"), "1000".getBytes("UTF-8"),
+        "5.3".getBytes("UTF-8"), "howl and hadoop".getBytes("UTF-8"),
+        new byte[0], "\\N".getBytes("UTF-8") };
+    return bytes;
+  }
+
+  private HCatSchema buildHiveSchema() throws HCatException{
+
+    List<FieldSchema> fields = new ArrayList<FieldSchema>(8);
+    fields.add(new FieldSchema("atinyint", "tinyint", ""));
+    fields.add(new FieldSchema("asmallint", "smallint", ""));
+    fields.add(new FieldSchema("aint", "int", ""));
+    fields.add(new FieldSchema("along", "bigint", ""));
+    fields.add(new FieldSchema("adouble", "double", ""));
+    fields.add(new FieldSchema("astring", "string", ""));
+    fields.add(new FieldSchema("anullint", "int", ""));
+    fields.add(new FieldSchema("anullstring", "string", ""));
+
+    return new HCatSchema(HCatUtil.getHCatFieldSchemaList(fields));
+  }
+}