You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ga...@apache.org on 2011/04/12 17:30:12 UTC

svn commit: r1091509 [7/8] - in /incubator/hcatalog/trunk: ./ bin/ ivy/ src/ src/docs/ src/docs/src/ src/docs/src/documentation/ src/docs/src/documentation/classes/ src/docs/src/documentation/conf/ src/docs/src/documentation/content/ src/docs/src/docum...

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/schema/TestHCatSchemaUtils.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/schema/TestHCatSchemaUtils.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/schema/TestHCatSchemaUtils.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/schema/TestHCatSchemaUtils.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.data.schema;
+
+import java.io.PrintStream;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.data.schema.HCatFieldSchema.Category;
+
+public class TestHCatSchemaUtils extends TestCase {
+
+    public void testSimpleOperation() throws Exception{
+        String typeString = "struct<name:string,studentid:int,"
+            + "contact:struct<phno:string,email:string>,"
+            + "currently_registered_courses:array<string>,"
+            + "current_grades:map<string,string>,"
+            + "phnos:array<struct<phno:string,type:string>>,blah:array<int>>";
+
+        TypeInfo ti = TypeInfoUtils.getTypeInfoFromTypeString(typeString);
+
+        HCatSchema hsch = HCatSchemaUtils.getHCatSchemaFromTypeString(typeString);
+        System.out.println(ti.getTypeName());
+        System.out.println(hsch.toString());
+        assertEquals(ti.getTypeName(),hsch.toString());
+        assertEquals(hsch.toString(),typeString);
+    }
+
+    @SuppressWarnings("unused")
+    private void pretty_print(PrintStream pout, HCatSchema hsch) throws HCatException {
+        pretty_print(pout,hsch,"");
+    }
+
+
+    private void pretty_print(PrintStream pout, HCatSchema hsch, String prefix) throws HCatException {
+        int i = 0;
+        for (HCatFieldSchema field : hsch.getFields()){
+            pretty_print(pout,field,prefix+"."+(field.getName()==null?i:field.getName()));
+            i++;
+        }
+    }
+    private void pretty_print(PrintStream pout, HCatFieldSchema hfsch, String prefix) throws HCatException {
+
+        Category tcat = hfsch.getCategory();
+        if (Category.STRUCT == tcat){
+            pretty_print(pout,hfsch.getStructSubSchema(),prefix);
+        }else if (Category.ARRAY == tcat){
+            pretty_print(pout,hfsch.getArrayElementSchema(),prefix);
+        }else if (Category.MAP == tcat){
+            pout.println(prefix + ".mapkey:\t" + hfsch.getMapKeyType().toString());
+            pretty_print(pout,hfsch.getMapValueSchema(),prefix+".mapvalue:");
+        }else{
+            pout.println(prefix + "\t" + hfsch.getType().toString());
+        }
+    }
+
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputCommitter;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.rcfile.RCFileInputDriver;
+import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+
+/**
+ * Test for HowlOutputFormat. Writes a partition using HowlOutputFormat and reads
+ * it back using HCatInputFormat, checks the column values and counts.
+ */
+public abstract class HCatMapReduceTest extends TestCase {
+
+  protected String dbName = "default";
+  protected String tableName = "testHowlMapReduceTable";
+
+  protected String inputFormat = RCFileInputFormat.class.getName();
+  protected String outputFormat = RCFileOutputFormat.class.getName();
+  protected String inputSD = RCFileInputDriver.class.getName();
+  protected String outputSD = RCFileOutputDriver.class.getName();
+  protected String serdeClass = ColumnarSerDe.class.getName();
+
+  private static List<HCatRecord> writeRecords = new ArrayList<HCatRecord>();
+  private static List<HCatRecord> readRecords = new ArrayList<HCatRecord>();
+
+  protected abstract void initialize() throws Exception;
+
+  protected abstract List<FieldSchema> getPartitionKeys();
+
+  protected abstract List<FieldSchema> getTableColumns();
+
+  private HiveMetaStoreClient client;
+  protected HiveConf hiveConf;
+
+  private FileSystem fs;
+  private String thriftUri = null;
+
+  protected Driver driver;
+
+  @Override
+  protected void setUp() throws Exception {
+    hiveConf = new HiveConf(this.getClass());
+
+    //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook
+    //is present only in the ql/test directory
+    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+    driver = new Driver(hiveConf);
+    SessionState.start(new CliSessionState(hiveConf));
+
+    thriftUri = System.getenv("HCAT_METASTORE_URI");
+
+    if( thriftUri != null ) {
+      System.out.println("Using URI " + thriftUri);
+
+      hiveConf.set("hive.metastore.local", "false");
+      hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
+    }
+
+    fs = new LocalFileSystem();
+    fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+
+    initialize();
+
+    client = new HiveMetaStoreClient(hiveConf, null);
+    initTable();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    try {
+      String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+
+      client.dropTable(databaseName, tableName);
+    } catch(Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+
+    client.close();
+  }
+
+
+
+  private void initTable() throws Exception {
+
+    String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+
+    try {
+      client.dropTable(databaseName, tableName);
+    } catch(Exception e) {
+    } //can fail with NoSuchObjectException
+
+
+    Table tbl = new Table();
+    tbl.setDbName(databaseName);
+    tbl.setTableName(tableName);
+    tbl.setTableType("MANAGED_TABLE");
+    StorageDescriptor sd = new StorageDescriptor();
+
+    sd.setCols(getTableColumns());
+    tbl.setPartitionKeys(getPartitionKeys());
+
+    tbl.setSd(sd);
+
+    sd.setBucketCols(new ArrayList<String>(2));
+    sd.setSerdeInfo(new SerDeInfo());
+    sd.getSerdeInfo().setName(tbl.getTableName());
+    sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+    sd.getSerdeInfo().getParameters().put(
+        org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+    sd.getSerdeInfo().setSerializationLib(serdeClass);
+    sd.setInputFormat(inputFormat);
+    sd.setOutputFormat(outputFormat);
+
+    Map<String, String> tableParams = new HashMap<String, String>();
+    tableParams.put(HCatConstants.HCAT_ISD_CLASS, inputSD);
+    tableParams.put(HCatConstants.HCAT_OSD_CLASS, outputSD);
+    tbl.setParameters(tableParams);
+
+    client.createTable(tbl);
+  }
+
+  //Create test input file with specified number of rows
+  private void createInputFile(Path path, int rowCount) throws IOException {
+
+    if( fs.exists(path) ) {
+      fs.delete(path, true);
+    }
+
+    FSDataOutputStream os = fs.create(path);
+
+    for(int i = 0;i < rowCount;i++) {
+      os.writeChars(i + "\n");
+    }
+
+    os.close();
+  }
+
+  public static class MapCreate extends
+  Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+    static int writeCount = 0; //test will be in local mode
+
+    @Override
+    public void map(LongWritable key, Text value, Context context
+    ) throws IOException, InterruptedException {
+      {
+        try {
+          HCatRecord rec = writeRecords.get(writeCount);
+          context.write(null, rec);
+          writeCount++;
+
+        }catch(Exception e) {
+
+          e.printStackTrace(System.err); //print since otherwise exception is lost
+          throw new IOException(e);
+        }
+      }
+    }
+  }
+
+  public static class MapRead extends
+  Mapper<WritableComparable, HCatRecord, BytesWritable, Text> {
+
+    static int readCount = 0; //test will be in local mode
+
+    @Override
+    public void map(WritableComparable key, HCatRecord value, Context context
+    ) throws IOException, InterruptedException {
+      {
+        try {
+          readRecords.add(value);
+          readCount++;
+        } catch(Exception e) {
+          e.printStackTrace(); //print since otherwise exception is lost
+          throw new IOException(e);
+        }
+      }
+    }
+  }
+
+  void runMRCreate(Map<String, String> partitionValues,
+        List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
+        int writeCount) throws Exception {
+
+    writeRecords = records;
+    MapCreate.writeCount = 0;
+
+    Configuration conf = new Configuration();
+    Job job = new Job(conf, "howl mapreduce write test");
+    job.setJarByClass(this.getClass());
+    job.setMapperClass(HCatMapReduceTest.MapCreate.class);
+
+    // input/output settings
+    job.setInputFormatClass(TextInputFormat.class);
+
+    Path path = new Path(fs.getWorkingDirectory(), "mapred/testHowlMapReduceInput");
+    createInputFile(path, writeCount);
+
+    TextInputFormat.setInputPaths(job, path);
+
+    job.setOutputFormatClass(HCatOutputFormat.class);
+
+    HCatTableInfo outputInfo = HCatTableInfo.getOutputTableInfo(thriftUri, null, dbName, tableName, partitionValues);
+    HCatOutputFormat.setOutput(job, outputInfo);
+
+    job.setMapOutputKeyClass(BytesWritable.class);
+    job.setMapOutputValueClass(DefaultHCatRecord.class);
+
+    job.setNumReduceTasks(0);
+
+    HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
+
+    //new HowlOutputCommitter(null).setupJob(job);
+    job.waitForCompletion(true);
+    new HCatOutputCommitter(null).cleanupJob(job);
+    Assert.assertEquals(writeCount, MapCreate.writeCount);
+  }
+
+  List<HCatRecord> runMRRead(int readCount) throws Exception {
+    return runMRRead(readCount, null);
+  }
+
+  List<HCatRecord> runMRRead(int readCount, String filter) throws Exception {
+
+    MapRead.readCount = 0;
+    readRecords.clear();
+
+    Configuration conf = new Configuration();
+    Job job = new Job(conf, "howl mapreduce read test");
+    job.setJarByClass(this.getClass());
+    job.setMapperClass(HCatMapReduceTest.MapRead.class);
+
+    // input/output settings
+    job.setInputFormatClass(HCatInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    HCatTableInfo inputInfo = HCatTableInfo.getInputTableInfo(
+          thriftUri, null, dbName, tableName, filter);
+    HCatInputFormat.setInput(job, inputInfo);
+
+    job.setMapOutputKeyClass(BytesWritable.class);
+    job.setMapOutputValueClass(Text.class);
+
+    job.setNumReduceTasks(0);
+
+    Path path = new Path(fs.getWorkingDirectory(), "mapred/testHowlMapReduceOutput");
+    if( fs.exists(path) ) {
+      fs.delete(path, true);
+    }
+
+    TextOutputFormat.setOutputPath(job, path);
+
+    job.waitForCompletion(true);
+    Assert.assertEquals(readCount, MapRead.readCount);
+
+    return readRecords;
+  }
+
+
+  protected HCatSchema getTableSchema() throws Exception {
+
+    Configuration conf = new Configuration();
+    Job job = new Job(conf, "howl mapreduce read schema test");
+    job.setJarByClass(this.getClass());
+
+    // input/output settings
+    job.setInputFormatClass(HCatInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    HCatTableInfo inputInfo = HCatTableInfo.getInputTableInfo(thriftUri, null, dbName, tableName);
+    HCatInputFormat.setInput(job, inputInfo);
+
+    return HCatInputFormat.getTableSchema(job);
+  }
+
+}
+
+
+

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.pig.HCatLoader;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+
+public class TestHCatHiveCompatibility extends TestCase {
+
+  MiniCluster cluster = MiniCluster.buildCluster();
+  private Driver driver;
+  Properties props;
+
+  private HiveMetaStoreClient client;
+
+  String fileName = "/tmp/input.data";
+  String fullFileName;
+
+  @Override
+  protected void setUp() throws Exception {
+
+    HiveConf hiveConf = new HiveConf(this.getClass());
+    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+    driver = new Driver(hiveConf);
+    client = new HiveMetaStoreClient(hiveConf);
+    SessionState.start(new CliSessionState(hiveConf));
+    props = new Properties();
+    props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+    fullFileName = cluster.getProperties().getProperty("fs.default.name") + fileName;
+
+    MiniCluster.deleteFile(cluster, fileName);
+    int LOOP_SIZE = 11;
+    String[] input = new String[LOOP_SIZE];
+    for(int i = 0; i < LOOP_SIZE; i++) {
+        input[i] = i + "\t1";
+    }
+    MiniCluster.createInputFile(cluster, fileName, input);
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    MiniCluster.deleteFile(cluster, fileName);
+  }
+
+  public void testUnpartedReadWrite() throws Exception{
+
+    driver.run("drop table junit_unparted_noisd");
+    String createTable = "create table junit_unparted_noisd(a int) stored as RCFILE";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+
+    // assert that the table created has no howl instrumentation, and that we're still able to read it.
+    Table table = client.getTable("default", "junit_unparted_noisd");
+    assertFalse(table.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS));
+    assertTrue(table.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.registerQuery("A = load '"+fullFileName+"' as (a:int);");
+    server.registerQuery("store A into 'default.junit_unparted_noisd' using org.apache.hcatalog.pig.HCatStorer();");
+    server.registerQuery("B = load 'default.junit_unparted_noisd' using "+HCatLoader.class.getName()+"();");
+    Iterator<Tuple> itr= server.openIterator("B");
+
+    int i = 0;
+
+    while(itr.hasNext()){
+      Tuple t = itr.next();
+      assertEquals(1, t.size());
+      assertEquals(t.get(0), i);
+      i++;
+    }
+
+    assertFalse(itr.hasNext());
+    assertEquals(11, i);
+
+    // assert that the table created still has no howl instrumentation
+    Table table2 = client.getTable("default", "junit_unparted_noisd");
+    assertFalse(table2.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS));
+    assertTrue(table2.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+    driver.run("drop table junit_unparted_noisd");
+  }
+
+  public void testPartedRead() throws Exception{
+
+    driver.run("drop table junit_parted_noisd");
+    String createTable = "create table junit_parted_noisd(a int) partitioned by (b string) stored as RCFILE";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table.");
+    }
+
+    // assert that the table created has no howl instrumentation, and that we're still able to read it.
+    Table table = client.getTable("default", "junit_parted_noisd");
+
+    assertFalse(table.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS));
+    assertTrue(table.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.registerQuery("A = load '"+fullFileName+"' as (a:int);");
+    server.registerQuery("store A into 'default.junit_parted_noisd' using org.apache.hcatalog.pig.HCatStorer('b=42');");
+    server.registerQuery("B = load 'default.junit_parted_noisd' using "+HCatLoader.class.getName()+"();");
+    Iterator<Tuple> itr= server.openIterator("B");
+
+    int i = 0;
+
+    while(itr.hasNext()){
+      Tuple t = itr.next();
+      assertEquals(2, t.size());
+      assertEquals(t.get(0), i);
+      assertEquals(t.get(1), "42");
+      i++;
+    }
+
+    assertFalse(itr.hasNext());
+    assertEquals(11, i);
+
+    // assert that the table created still has no howl instrumentation
+    Table table2 = client.getTable("default", "junit_parted_noisd");
+    assertFalse(table2.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS));
+    assertTrue(table2.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+
+    // assert that there is one partition present, and it had howl instrumentation inserted when it was created.
+    Partition ptn = client.getPartition("default", "junit_parted_noisd", Arrays.asList("42"));
+
+    assertNotNull(ptn);
+    assertTrue(ptn.getParameters().containsKey(HCatConstants.HCAT_ISD_CLASS));
+    assertTrue(ptn.getSd().getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS));
+    driver.run("drop table junit_unparted_noisd");
+  }
+
+
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+
+public class TestHCatNonPartitioned extends HCatMapReduceTest {
+
+  private List<HCatRecord> writeRecords;
+  List<HCatFieldSchema> partitionColumns;
+
+  @Override
+  protected void initialize() throws HCatException {
+
+    dbName = null; //test if null dbName works ("default" is used)
+    tableName = "testHowlNonPartitionedTable";
+
+    writeRecords = new ArrayList<HCatRecord>();
+
+    for(int i = 0;i < 20;i++) {
+      List<Object> objList = new ArrayList<Object>();
+
+      objList.add(i);
+      objList.add("strvalue" + i);
+      writeRecords.add(new DefaultHCatRecord(objList));
+    }
+
+    partitionColumns = new ArrayList<HCatFieldSchema>();
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+  }
+
+  @Override
+  protected List<FieldSchema> getPartitionKeys() {
+    List<FieldSchema> fields = new ArrayList<FieldSchema>();
+    //empty list, non partitioned
+    return fields;
+  }
+
+  @Override
+  protected List<FieldSchema> getTableColumns() {
+    List<FieldSchema> fields = new ArrayList<FieldSchema>();
+    fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+    fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+    return fields;
+  }
+
+
+  public void testHowlNonPartitionedTable() throws Exception {
+
+    Map<String, String> partitionMap = new HashMap<String, String>();
+    runMRCreate(null, partitionColumns, writeRecords, 10);
+
+    //Test for duplicate publish
+    IOException exc = null;
+    try {
+      runMRCreate(null,  partitionColumns, writeRecords, 20);
+    } catch(IOException e) {
+      exc = e;
+    }
+
+    assertTrue(exc != null);
+    assertTrue(exc instanceof HCatException);
+    assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType());
+
+    //Test for publish with invalid partition key name
+    exc = null;
+    partitionMap.clear();
+    partitionMap.put("px", "p1value2");
+
+    try {
+      runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+    } catch(IOException e) {
+      exc = e;
+    }
+
+    assertTrue(exc != null);
+    assertTrue(exc instanceof HCatException);
+    assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+
+    //Read should get 10 rows
+    runMRRead(10);
+
+    hiveReadTest();
+  }
+
+  //Test that data inserted through howloutputformat is readable from hive
+  private void hiveReadTest() throws Exception {
+
+    String query = "select * from " + tableName;
+    int retCode = driver.run(query).getResponseCode();
+
+    if( retCode != 0 ) {
+      throw new Exception("Error " + retCode + " running query " + query);
+    }
+
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+    assertEquals(10, res.size());
+  }
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.mapreduce.HCatOutputCommitter;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.mapreduce.InitializeInput;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+
+public class TestHCatOutputFormat extends TestCase {
+  private HiveMetaStoreClient client;
+  private HiveConf hiveConf;
+
+  private static final String dbName = "howlOutputFormatTestDB";
+  private static final String tblName = "howlOutputFormatTestTable";
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    hiveConf = new HiveConf(this.getClass());
+
+    try {
+      client = new HiveMetaStoreClient(hiveConf, null);
+
+      initTable();
+    } catch (Throwable e) {
+      System.err.println("Unable to open the metastore");
+      System.err.println(StringUtils.stringifyException(e));
+      throw new Exception(e);
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    try {
+      super.tearDown();
+      client.dropTable(dbName, tblName);
+      client.dropDatabase(dbName);
+
+      client.close();
+    } catch (Throwable e) {
+      System.err.println("Unable to close metastore");
+      System.err.println(StringUtils.stringifyException(e));
+      throw new Exception(e);
+    }
+  }
+
+  private void initTable() throws Exception {
+
+    try {
+      client.dropTable(dbName, tblName);
+    } catch(Exception e) {}
+    try {
+      client.dropDatabase(dbName);
+    } catch(Exception e) {}
+    client.createDatabase(new Database(dbName, "", null,null));
+    assertNotNull((client.getDatabase(dbName).getLocationUri()));
+
+    List<FieldSchema> fields = new ArrayList<FieldSchema>();
+    fields.add(new FieldSchema("colname", Constants.STRING_TYPE_NAME, ""));
+
+    Table tbl = new Table();
+    tbl.setDbName(dbName);
+    tbl.setTableName(tblName);
+    StorageDescriptor sd = new StorageDescriptor();
+    sd.setCols(fields);
+    tbl.setSd(sd);
+
+    //sd.setLocation("hdfs://tmp");
+    sd.setParameters(new HashMap<String, String>());
+    sd.getParameters().put("test_param_1", "Use this for comments etc");
+    sd.setBucketCols(new ArrayList<String>(2));
+    sd.getBucketCols().add("name");
+    sd.setSerdeInfo(new SerDeInfo());
+    sd.getSerdeInfo().setName(tbl.getTableName());
+    sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+    sd.getSerdeInfo().getParameters().put(
+        org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+    sd.getSerdeInfo().setSerializationLib(
+        org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+    tbl.setPartitionKeys(fields);
+
+    Map<String, String> tableParams = new HashMap<String, String>();
+    tableParams.put(HCatConstants.HCAT_OSD_CLASS, RCFileOutputDriver.class.getName());
+    tableParams.put(HCatConstants.HCAT_ISD_CLASS, "testInputClass");
+    tableParams.put("hcat.testarg", "testArgValue");
+
+    tbl.setParameters(tableParams);
+
+    client.createTable(tbl);
+    Path tblPath = new Path(client.getTable(dbName, tblName).getSd().getLocation());
+    assertTrue(tblPath.getFileSystem(hiveConf).mkdirs(new Path(tblPath,"colname=p1")));
+
+  }
+
+  public void testSetOutput() throws Exception {
+    Configuration conf = new Configuration();
+    Job job = new Job(conf, "test outputformat");
+
+    Map<String, String> partitionValues = new HashMap<String, String>();
+    partitionValues.put("colname", "p1");
+    //null server url means local mode
+    HCatTableInfo info = HCatTableInfo.getOutputTableInfo(null, null, dbName, tblName, partitionValues);
+
+    HCatOutputFormat.setOutput(job, info);
+    OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(job);
+
+    assertNotNull(jobInfo.getTableInfo());
+    assertEquals(1, jobInfo.getTableInfo().getPartitionValues().size());
+    assertEquals("p1", jobInfo.getTableInfo().getPartitionValues().get("colname"));
+    assertEquals(1, jobInfo.getTableSchema().getFields().size());
+    assertEquals("colname", jobInfo.getTableSchema().getFields().get(0).getName());
+
+    StorerInfo storer = jobInfo.getStorerInfo();
+    assertEquals(RCFileOutputDriver.class.getName(), storer.getOutputSDClass());
+
+    publishTest(job);
+  }
+
+  public void publishTest(Job job) throws Exception {
+    OutputCommitter committer = new HCatOutputCommitter(null);
+    committer.cleanupJob(job);
+
+    Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));
+    assertNotNull(part);
+
+    StorerInfo storer = InitializeInput.extractStorerInfo(part.getSd(),part.getParameters());
+    assertEquals(storer.getInputSDClass(), "testInputClass");
+    assertEquals(storer.getProperties().get("hcat.testarg"), "testArgValue");
+    assertTrue(part.getSd().getLocation().indexOf("p1") != -1);
+  }
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+
+public class TestHCatPartitioned extends HCatMapReduceTest {
+
+  private List<HCatRecord> writeRecords;
+  private List<HCatFieldSchema> partitionColumns;
+
+  @Override
+  protected void initialize() throws Exception {
+
+    tableName = "testHowlPartitionedTable";
+    writeRecords = new ArrayList<HCatRecord>();
+
+    for(int i = 0;i < 20;i++) {
+      List<Object> objList = new ArrayList<Object>();
+
+      objList.add(i);
+      objList.add("strvalue" + i);
+      writeRecords.add(new DefaultHCatRecord(objList));
+    }
+
+    partitionColumns = new ArrayList<HCatFieldSchema>();
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+  }
+
+
+  @Override
+  protected List<FieldSchema> getPartitionKeys() {
+    List<FieldSchema> fields = new ArrayList<FieldSchema>();
+    fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, ""));
+    return fields;
+  }
+
+  @Override
+  protected List<FieldSchema> getTableColumns() {
+    List<FieldSchema> fields = new ArrayList<FieldSchema>();
+    fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+    fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+    return fields;
+  }
+
+
+  public void testHowlPartitionedTable() throws Exception {
+
+    Map<String, String> partitionMap = new HashMap<String, String>();
+    partitionMap.put("part1", "p1value1");
+
+    runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+
+    partitionMap.clear();
+    partitionMap.put("PART1", "p1value2");
+
+    runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+
+    //Test for duplicate publish
+    IOException exc = null;
+    try {
+      runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+    } catch(IOException e) {
+      exc = e;
+    }
+
+    assertTrue(exc != null);
+    assertTrue(exc instanceof HCatException);
+    assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType());
+
+    //Test for publish with invalid partition key name
+    exc = null;
+    partitionMap.clear();
+    partitionMap.put("px", "p1value2");
+
+    try {
+      runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+    } catch(IOException e) {
+      exc = e;
+    }
+
+    assertTrue(exc != null);
+    assertTrue(exc instanceof HCatException);
+    assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType());
+
+
+    //Test for null partition value map
+    exc = null;
+    try {
+      runMRCreate(null, partitionColumns, writeRecords, 20);
+    } catch(IOException e) {
+      exc = e;
+    }
+
+    assertTrue(exc != null);
+    assertTrue(exc instanceof HCatException);
+    assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType());
+
+    //Read should get 10 + 20 rows
+    runMRRead(30);
+
+    //Read with partition filter
+    runMRRead(10, "part1 = \"p1value1\"");
+    runMRRead(20, "part1 = \"p1value2\"");
+    runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
+
+    tableSchemaTest();
+    columnOrderChangeTest();
+    hiveReadTest();
+  }
+
+
+  //test that new columns gets added to table schema
+  private void tableSchemaTest() throws Exception {
+
+    HCatSchema tableSchema = getTableSchema();
+
+    assertEquals(3, tableSchema.getFields().size());
+
+    //Update partition schema to have 3 fields
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
+
+    writeRecords = new ArrayList<HCatRecord>();
+
+    for(int i = 0;i < 20;i++) {
+      List<Object> objList = new ArrayList<Object>();
+
+      objList.add(i);
+      objList.add("strvalue" + i);
+      objList.add("str2value" + i);
+
+      writeRecords.add(new DefaultHCatRecord(objList));
+    }
+
+    Map<String, String> partitionMap = new HashMap<String, String>();
+    partitionMap.put("part1", "p1value5");
+
+    runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+
+    tableSchema = getTableSchema();
+
+    //assert that c3 has got added to table schema
+    assertEquals(4, tableSchema.getFields().size());
+    assertEquals("c1", tableSchema.getFields().get(0).getName());
+    assertEquals("c2", tableSchema.getFields().get(1).getName());
+    assertEquals("c3", tableSchema.getFields().get(2).getName());
+    assertEquals("part1", tableSchema.getFields().get(3).getName());
+
+    //Test that changing column data type fails
+    partitionMap.clear();
+    partitionMap.put("part1", "p1value6");
+
+    partitionColumns = new ArrayList<HCatFieldSchema>();
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.INT_TYPE_NAME, "")));
+
+    IOException exc = null;
+    try {
+      runMRCreate(partitionMap, partitionColumns, writeRecords, 20);
+    } catch(IOException e) {
+      exc = e;
+    }
+
+    assertTrue(exc != null);
+    assertTrue(exc instanceof HCatException);
+    assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType());
+
+    //Test that partition key is not allowed in data
+    partitionColumns = new ArrayList<HCatFieldSchema>();
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("part1", Constants.STRING_TYPE_NAME, "")));
+
+    List<HCatRecord> recordsContainingPartitionCols = new ArrayList<HCatRecord>(20);
+    for(int i = 0;i < 20;i++) {
+      List<Object> objList = new ArrayList<Object>();
+
+      objList.add(i);
+      objList.add("c2value" + i);
+      objList.add("c3value" + i);
+      objList.add("p1value6");
+
+      recordsContainingPartitionCols.add(new DefaultHCatRecord(objList));
+    }
+
+    exc = null;
+    try {
+      runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20);
+    } catch(IOException e) {
+      exc = e;
+    }
+
+    List<HCatRecord> records= runMRRead(20,"part1 = \"p1value6\"");
+    assertEquals(20, records.size());
+    Integer i =0;
+    for(HCatRecord rec : records){
+      assertEquals(4, rec.size());
+      assertTrue(rec.get(0).equals(i));
+      assertTrue(rec.get(1).equals("c2value"+i));
+      assertTrue(rec.get(2).equals("c3value"+i));
+      assertTrue(rec.get(3).equals("p1value6"));
+      i++;
+    }
+  }
+
+  //check behavior while change the order of columns
+  private void columnOrderChangeTest() throws Exception {
+
+    HCatSchema tableSchema = getTableSchema();
+
+    assertEquals(4, tableSchema.getFields().size());
+
+    partitionColumns = new ArrayList<HCatFieldSchema>();
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+
+
+    writeRecords = new ArrayList<HCatRecord>();
+
+    for(int i = 0;i < 10;i++) {
+      List<Object> objList = new ArrayList<Object>();
+
+      objList.add(i);
+      objList.add("co strvalue" + i);
+      objList.add("co str2value" + i);
+
+      writeRecords.add(new DefaultHCatRecord(objList));
+    }
+
+    Map<String, String> partitionMap = new HashMap<String, String>();
+    partitionMap.put("part1", "p1value8");
+
+
+    Exception exc = null;
+    try {
+      runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+    } catch(IOException e) {
+      exc = e;
+    }
+
+    assertTrue(exc != null);
+    assertTrue(exc instanceof HCatException);
+    assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType());
+
+
+    partitionColumns = new ArrayList<HCatFieldSchema>();
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
+    partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
+
+    writeRecords = new ArrayList<HCatRecord>();
+
+    for(int i = 0;i < 10;i++) {
+      List<Object> objList = new ArrayList<Object>();
+
+      objList.add(i);
+      objList.add("co strvalue" + i);
+
+      writeRecords.add(new DefaultHCatRecord(objList));
+    }
+
+    runMRCreate(partitionMap, partitionColumns, writeRecords, 10);
+
+    //Read should get 10 + 20 + 10 + 10 + 20 rows
+    runMRRead(70);
+  }
+
+  //Test that data inserted through howloutputformat is readable from hive
+  private void hiveReadTest() throws Exception {
+
+    String query = "select * from " + tableName;
+    int retCode = driver.run(query).getResponseCode();
+
+    if( retCode != 0 ) {
+      throw new Exception("Error " + retCode + " running query " + query);
+    }
+
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+    assertEquals(70, res.size());
+  }
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hcatalog.pig.drivers.PigStorageInputDriver;
+
+public class MyPigStorageDriver extends PigStorageInputDriver{
+
+  @Override
+  public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
+    if ( !"control-A".equals(storageDriverArgs.getProperty(PigStorageInputDriver.delim))){
+      /* This is the only way to make testcase fail. Throwing exception from
+       * here doesn't propagate up.
+       */
+      System.exit(1);
+    }
+    super.initialize(context, storageDriverArgs);
+  }
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.data.Pair;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.UDFContext;
+
+public class TestHCatLoader extends TestCase {
+
+  private static final String BASIC_TABLE = "junit_unparted_basic";
+  private static final String COMPLEX_TABLE = "junit_unparted_complex";
+  private static final String PARTITIONED_TABLE = "junit_parted_basic";
+  private static MiniCluster cluster = MiniCluster.buildCluster();
+  private static Driver driver;
+  private static Properties props;
+
+  private static final String basicFile = "/tmp/basic.input.data";
+  private static final String complexFile = "/tmp/complex.input.data";
+  private static String fullFileNameBasic;
+  private static String fullFileNameComplex;
+
+  private static int guardTestCount = 5; // ugh, instantiate using introspection in guardedSetupBeforeClass
+  private static boolean setupHasRun = false;
+
+  private static Map<Integer,Pair<Integer,String>> basicInputData;
+
+  private void dropTable(String tablename) throws IOException{
+    driver.run("drop table "+tablename);
+  }
+  private void createTable(String tablename, String schema, String partitionedBy) throws IOException{
+    String createTable;
+    createTable = "create table "+tablename+"("+schema+") ";
+    if ((partitionedBy != null)&&(!partitionedBy.trim().isEmpty())){
+      createTable = createTable + "partitioned by ("+partitionedBy+") ";
+    }
+    createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
+        "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table. ["+createTable+"], return code from hive driver : ["+retCode+"]");
+    }
+  }
+
+  private void createTable(String tablename, String schema) throws IOException{
+    createTable(tablename,schema,null);
+  }
+
+  protected void guardedSetUpBeforeClass() throws Exception {
+    if (!setupHasRun){
+      setupHasRun = true;
+    }else{
+      return;
+    }
+
+    HiveConf hiveConf = new HiveConf(this.getClass());
+    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+    driver = new Driver(hiveConf);
+    SessionState.start(new CliSessionState(hiveConf));
+    props = new Properties();
+    props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+    fullFileNameBasic = cluster.getProperties().getProperty("fs.default.name") + basicFile;
+    fullFileNameComplex = cluster.getProperties().getProperty("fs.default.name") + complexFile;
+
+    cleanup();
+
+    createTable(BASIC_TABLE,"a int, b string");
+    createTable(COMPLEX_TABLE,
+        "name string, studentid int, "
+        + "contact struct<phno:string,email:string>, "
+        + "currently_registered_courses array<string>, "
+        + "current_grades map<string,string>, "
+        + "phnos array<struct<phno:string,type:string>>");
+
+    createTable(PARTITIONED_TABLE,"a int, b string","bkt string");
+
+
+    int LOOP_SIZE = 3;
+    String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+    basicInputData = new HashMap<Integer,Pair<Integer,String>>();
+    int k = 0;
+    for(int i = 1; i <= LOOP_SIZE; i++) {
+      String si = i + "";
+      for(int j=1;j<=LOOP_SIZE;j++) {
+        String sj = "S"+j+"S";
+        input[k] = si + "\t" + sj;
+        basicInputData.put(k, new Pair<Integer,String>(i,sj));
+        k++;
+      }
+    }
+    MiniCluster.createInputFile(cluster, basicFile, input);
+
+    MiniCluster.createInputFile(cluster, complexFile,
+        new String[]{
+        //"Henry Jekyll\t42\t(415-253-6367,hjekyll@contemporary.edu.uk)\t{(PHARMACOLOGY),(PSYCHIATRY)},[PHARMACOLOGY#A-,PSYCHIATRY#B+],{(415-253-6367,cell),(408-253-6367,landline)}",
+        //"Edward Hyde\t1337\t(415-253-6367,anonymous@b44chan.org)\t{(CREATIVE_WRITING),(COPYRIGHT_LAW)},[CREATIVE_WRITING#A+,COPYRIGHT_LAW#D],{(415-253-6367,cell),(408-253-6367,landline)}",
+        }
+    );
+
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '"+fullFileNameBasic+"' as (a:int, b:chararray);");
+
+    server.registerQuery("store A into '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+    server.registerQuery("B = foreach A generate a,b;");
+    server.registerQuery("B2 = filter B by a < 2;");
+    server.registerQuery("store B2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
+
+    server.registerQuery("C = foreach A generate a,b;");
+    server.registerQuery("C2 = filter C by a >= 2;");
+    server.registerQuery("store C2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=1');");
+
+    server.registerQuery("D = load '"+fullFileNameComplex+"' as (name:chararray, studentid:int, contact:tuple(phno:chararray,email:chararray), currently_registered_courses:bag{innertup:tuple(course:chararray)}, current_grades:map[ ] , phnos :bag{innertup:tuple(phno:chararray,type:chararray)});");
+    server.registerQuery("store D into '"+COMPLEX_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+    server.executeBatch();
+
+  }
+  private void cleanup() throws IOException {
+    MiniCluster.deleteFile(cluster, basicFile);
+    MiniCluster.deleteFile(cluster, complexFile);
+    dropTable(BASIC_TABLE);
+    dropTable(COMPLEX_TABLE);
+    dropTable(PARTITIONED_TABLE);
+  }
+
+  protected void guardedTearDownAfterClass() throws Exception {
+    guardTestCount--;
+    if (guardTestCount > 0){
+      return;
+    }
+    cleanup();
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    guardedSetUpBeforeClass();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    guardedTearDownAfterClass();
+  }
+
+  public void testSchemaLoadBasic() throws IOException{
+
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+
+    // test that schema was loaded correctly
+    server.registerQuery("X = load '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+    Schema dumpedXSchema = server.dumpSchema("X");
+    List<FieldSchema> Xfields = dumpedXSchema.getFields();
+    assertEquals(2,Xfields.size());
+    assertTrue(Xfields.get(0).alias.equalsIgnoreCase("a"));
+    assertTrue(Xfields.get(0).type == DataType.INTEGER);
+    assertTrue(Xfields.get(1).alias.equalsIgnoreCase("b"));
+    assertTrue(Xfields.get(1).type == DataType.CHARARRAY);
+
+  }
+
+  public void testReadDataBasic() throws IOException {
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+
+    server.registerQuery("X = load '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+    Iterator<Tuple> XIter = server.openIterator("X");
+    int numTuplesRead = 0;
+    while( XIter.hasNext() ){
+      Tuple t = XIter.next();
+      assertEquals(2,t.size());
+      assertTrue(t.get(0).getClass() == Integer.class);
+      assertTrue(t.get(1).getClass() == String.class);
+      assertEquals(t.get(0),basicInputData.get(numTuplesRead).first);
+      assertEquals(t.get(1),basicInputData.get(numTuplesRead).second);
+      numTuplesRead++;
+    }
+    assertEquals(basicInputData.size(),numTuplesRead);
+  }
+
+  public void testSchemaLoadComplex() throws IOException{
+
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+
+    // test that schema was loaded correctly
+    server.registerQuery("K = load '"+COMPLEX_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+    Schema dumpedKSchema = server.dumpSchema("K");
+    List<FieldSchema> Kfields = dumpedKSchema.getFields();
+    assertEquals(6,Kfields.size());
+
+    assertEquals(DataType.CHARARRAY,Kfields.get(0).type);
+    assertEquals("name",Kfields.get(0).alias.toLowerCase());
+
+    assertEquals( DataType.INTEGER,Kfields.get(1).type);
+    assertEquals("studentid",Kfields.get(1).alias.toLowerCase());
+
+    assertEquals(DataType.TUPLE,Kfields.get(2).type);
+    assertEquals("contact",Kfields.get(2).alias.toLowerCase());
+    {
+      assertNotNull(Kfields.get(2).schema);
+      assertTrue(Kfields.get(2).schema.getFields().size() == 2);
+      assertTrue(Kfields.get(2).schema.getFields().get(0).type == DataType.CHARARRAY);
+      assertTrue(Kfields.get(2).schema.getFields().get(0).alias.equalsIgnoreCase("phno"));
+      assertTrue(Kfields.get(2).schema.getFields().get(1).type == DataType.CHARARRAY);
+      assertTrue(Kfields.get(2).schema.getFields().get(1).alias.equalsIgnoreCase("email"));
+    }
+    assertEquals(DataType.BAG,Kfields.get(3).type);
+    assertEquals("currently_registered_courses",Kfields.get(3).alias.toLowerCase());
+    {
+      assertNotNull(Kfields.get(3).schema);
+      assertEquals(1,Kfields.get(3).schema.getFields().size());
+      assertEquals(DataType.TUPLE,Kfields.get(3).schema.getFields().get(0).type);
+      assertNotNull(Kfields.get(3).schema.getFields().get(0).schema);
+      assertEquals(1,Kfields.get(3).schema.getFields().get(0).schema.getFields().size());
+      assertEquals(DataType.CHARARRAY,Kfields.get(3).schema.getFields().get(0).schema.getFields().get(0).type);
+      // assertEquals("course",Kfields.get(3).schema.getFields().get(0).schema.getFields().get(0).alias.toLowerCase());
+      // commented out, because the name becomes "innerfield" by default - we call it "course" in pig,
+      // but in the metadata, it'd be anonymous, so this would be autogenerated, which is fine
+    }
+    assertEquals(DataType.MAP,Kfields.get(4).type);
+    assertEquals("current_grades",Kfields.get(4).alias.toLowerCase());
+    assertEquals(DataType.BAG,Kfields.get(5).type);
+    assertEquals("phnos",Kfields.get(5).alias.toLowerCase());
+    {
+      assertNotNull(Kfields.get(5).schema);
+      assertEquals(1,Kfields.get(5).schema.getFields().size());
+      assertEquals(DataType.TUPLE,Kfields.get(5).schema.getFields().get(0).type);
+      assertNotNull(Kfields.get(5).schema.getFields().get(0).schema);
+      assertTrue(Kfields.get(5).schema.getFields().get(0).schema.getFields().size() == 2);
+      assertEquals(DataType.CHARARRAY,Kfields.get(5).schema.getFields().get(0).schema.getFields().get(0).type);
+      assertEquals("phno",Kfields.get(5).schema.getFields().get(0).schema.getFields().get(0).alias.toLowerCase());
+      assertEquals(DataType.CHARARRAY,Kfields.get(5).schema.getFields().get(0).schema.getFields().get(1).type);
+      assertEquals("type",Kfields.get(5).schema.getFields().get(0).schema.getFields().get(1).alias.toLowerCase());
+    }
+
+  }
+
+  public void testReadPartitionedBasic() throws IOException {
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+
+    driver.run("select * from "+PARTITIONED_TABLE);
+    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    assertEquals(basicInputData.size(),valuesReadFromHiveDriver.size());
+
+    server.registerQuery("W = load '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+    Schema dumpedWSchema = server.dumpSchema("W");
+    List<FieldSchema> Wfields = dumpedWSchema.getFields();
+    assertEquals(3,Wfields.size());
+    assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
+    assertTrue(Wfields.get(0).type == DataType.INTEGER);
+    assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
+    assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
+    assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
+    assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
+
+    Iterator<Tuple> WIter = server.openIterator("W");
+    Collection<Pair<Integer,String>> valuesRead = new ArrayList<Pair<Integer,String>>();
+    while( WIter.hasNext() ){
+      Tuple t = WIter.next();
+      assertTrue(t.size() == 3);
+      assertTrue(t.get(0).getClass() == Integer.class);
+      assertTrue(t.get(1).getClass() == String.class);
+      assertTrue(t.get(2).getClass() == String.class);
+      valuesRead.add(new Pair<Integer,String>((Integer)t.get(0),(String)t.get(1)));
+      if ((Integer)t.get(0) < 2){
+        assertEquals("0",t.get(2));
+      }else{
+        assertEquals("1",t.get(2));
+      }
+    }
+    assertEquals(valuesReadFromHiveDriver.size(),valuesRead.size());
+
+    server.registerQuery("P1 = load '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+    server.registerQuery("P1filter = filter P1 by bkt == '0';");
+    Iterator<Tuple> P1Iter = server.openIterator("P1filter");
+    int count1 = 0;
+    while( P1Iter.hasNext() ) {
+      Tuple t = P1Iter.next();
+
+      assertEquals("0", t.get(2));
+      assertEquals(1, t.get(0));
+      count1++;
+    }
+    assertEquals(3, count1);
+
+    server.registerQuery("P2 = load '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+    server.registerQuery("P2filter = filter P2 by bkt == '1';");
+    Iterator<Tuple> P2Iter = server.openIterator("P2filter");
+    int count2 = 0;
+    while( P2Iter.hasNext() ) {
+      Tuple t = P2Iter.next();
+
+      assertEquals("1", t.get(2));
+      assertTrue(((Integer) t.get(0)) > 1);
+      count2++;
+    }
+    assertEquals(6, count2);
+  }
+
+  public void testProjectionsBasic() throws IOException {
+
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+
+    // projections are handled by using generate, not "as" on the Load
+
+    server.registerQuery("Y1 = load '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
+    server.registerQuery("Y2 = foreach Y1 generate a;");
+    server.registerQuery("Y3 = foreach Y1 generate b,a;");
+    Schema dumpedY2Schema = server.dumpSchema("Y2");
+    Schema dumpedY3Schema = server.dumpSchema("Y3");
+    List<FieldSchema> Y2fields = dumpedY2Schema.getFields();
+    List<FieldSchema> Y3fields = dumpedY3Schema.getFields();
+    assertEquals(1,Y2fields.size());
+    assertEquals("a",Y2fields.get(0).alias.toLowerCase());
+    assertEquals(DataType.INTEGER,Y2fields.get(0).type);
+    assertEquals(2,Y3fields.size());
+    assertEquals("b",Y3fields.get(0).alias.toLowerCase());
+    assertEquals(DataType.CHARARRAY,Y3fields.get(0).type);
+    assertEquals("a",Y3fields.get(1).alias.toLowerCase());
+    assertEquals(DataType.INTEGER,Y3fields.get(1).type);
+
+    int numTuplesRead = 0;
+    Iterator<Tuple> Y2Iter = server.openIterator("Y2");
+    while( Y2Iter.hasNext() ){
+      Tuple t = Y2Iter.next();
+      assertEquals(t.size(),1);
+      assertTrue(t.get(0).getClass() == Integer.class);
+      assertEquals(t.get(0),basicInputData.get(numTuplesRead).first);
+      numTuplesRead++;
+    }
+    numTuplesRead = 0;
+    Iterator<Tuple> Y3Iter = server.openIterator("Y3");
+    while( Y3Iter.hasNext() ){
+      Tuple t = Y3Iter.next();
+      assertEquals(t.size(),2);
+      assertTrue(t.get(0).getClass() == String.class);
+      assertEquals(t.get(0),basicInputData.get(numTuplesRead).second);
+      assertTrue(t.get(1).getClass() == Integer.class);
+      assertEquals(t.get(1),basicInputData.get(numTuplesRead).first);
+      numTuplesRead++;
+    }
+    assertEquals(basicInputData.size(),numTuplesRead);
+  }
+}