You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ma...@apache.org on 2011/05/04 17:50:43 UTC

svn commit: r1099539 [2/3] - in /incubator/hcatalog/trunk/src: java/org/apache/hcatalog/mapreduce/ java/org/apache/hcatalog/pig/ test/org/apache/hcatalog/mapreduce/ test/org/apache/hcatalog/pig/

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java Wed May  4 17:50:42 2011
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatEximOutputCommitter;
+import org.apache.hcatalog.mapreduce.HCatEximOutputFormat;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * HCatEximStorer.
+ *
+ */
+
+public class HCatEximStorer extends HCatBaseStorer {
+
+  private static final Log LOG = LogFactory.getLog(HCatEximStorer.class);
+
+  private final String outputLocation;
+
+  public HCatEximStorer(String outputLocation) throws FrontendException, ParseException {
+    this(outputLocation, null, null);
+  }
+
+  public HCatEximStorer(String outputLocation, String partitionSpec) throws FrontendException,
+      ParseException {
+    this(outputLocation, partitionSpec, null);
+  }
+
+  public HCatEximStorer(String outputLocation, String partitionSpec, String schema)
+      throws FrontendException, ParseException {
+    super(partitionSpec, schema);
+    this.outputLocation = outputLocation;
+    LOG.debug("HCatEximStorer called");
+  }
+
+  @Override
+  public OutputFormat getOutputFormat() throws IOException {
+    LOG.debug("getOutputFormat called");
+    return new HCatEximOutputFormat();
+  }
+
+  @Override
+  public void setStoreLocation(String location, Job job) throws IOException {
+    LOG.debug("setStoreLocation called with :" + location);
+    String[] userStr = location.split("\\.");
+    String dbname = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+    String tablename = null;
+    if (userStr.length == 2) {
+      dbname = userStr[0];
+      tablename = userStr[1];
+    } else {
+      tablename = userStr[0];
+    }
+    Properties p = UDFContext.getUDFContext()
+        .getUDFProperties(this.getClass(), new String[] {sign});
+    Configuration config = job.getConfiguration();
+    if (!HCatUtil.checkJobContextIfRunningFromBackend(job)) {
+      Schema schema = (Schema) ObjectSerializer.deserialize(p.getProperty(PIG_SCHEMA));
+      if (schema != null) {
+        pigSchema = schema;
+      }
+      if (pigSchema == null) {
+        throw new FrontendException("Schema for data cannot be determined.",
+            PigHCatUtil.PIG_EXCEPTION_CODE);
+      }
+      HCatSchema hcatTblSchema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+      try {
+        doSchemaValidations(pigSchema, hcatTblSchema);
+      } catch (HCatException he) {
+        throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+      }
+
+      List<HCatFieldSchema> hcatFields = new ArrayList<HCatFieldSchema>();
+      List<String> partVals = new ArrayList<String>();
+      for (String key : partitions.keySet()) {
+        hcatFields.add(new HCatFieldSchema(key, HCatFieldSchema.Type.STRING, ""));
+        partVals.add(partitions.get(key));
+      }
+
+      HCatSchema outputSchema = convertPigSchemaToHCatSchema(pigSchema,
+          hcatTblSchema);
+      LOG.debug("Pig Schema '" + pigSchema.toString() + "' was converted to HCatSchema '"
+          + outputSchema);
+      HCatEximOutputFormat.setOutput(job,
+          dbname, tablename,
+          outputLocation,
+          new HCatSchema(hcatFields),
+          partVals,
+          outputSchema);
+      p.setProperty(COMPUTED_OUTPUT_SCHEMA, ObjectSerializer.serialize(outputSchema));
+      p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+          config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+      if (config.get(HCatConstants.HCAT_KEY_HIVE_CONF) != null) {
+        p.setProperty(HCatConstants.HCAT_KEY_HIVE_CONF,
+            config.get(HCatConstants.HCAT_KEY_HIVE_CONF));
+      }
+    } else {
+      config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+          p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+      if (p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF) != null) {
+        config.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+            p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF));
+      }
+    }
+  }
+
+  @Override
+  public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
+    if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) {
+      //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob.
+      //Calling it from here so that the partition publish happens.
+      //This call needs to be removed after MAPREDUCE-1447 is fixed.
+      new HCatEximOutputCommitter(null).cleanupJob(job);
+    }
+  }
+}

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1099539&r1=1099538&r2=1099539&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java Wed May  4 17:50:42 2011
@@ -18,7 +18,6 @@
 package org.apache.hcatalog.pig;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
@@ -27,48 +26,34 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.Pair;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.hcatalog.mapreduce.HCatTableInfo;
 import org.apache.pig.Expression;
+import org.apache.pig.Expression.BinaryExpression;
 import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
-import org.apache.pig.LoadPushDown;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
-import org.apache.pig.Expression.BinaryExpression;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.UDFContext;
 
 /**
  * Pig {@link LoadFunc} to read data from Howl
  */
 
-public class HCatLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
+public class HCatLoader extends HCatBaseLoader {
 
-  private static final String PRUNE_PROJECTION_INFO = "prune.projection.info";
   private static final String PARTITION_FILTER = "partition.filter"; // for future use
 
   private HCatInputFormat howlInputFormat = null;
-  private RecordReader<?, ?> reader;
   private String dbName;
   private String tableName;
   private String howlServerUri;
-  private String signature;
   private String partitionFilterString;
   private final PigHCatUtil phutil = new PigHCatUtil();
 
-  HCatSchema outputSchema = null;
-
   @Override
   public InputFormat<?,?> getInputFormat() throws IOException {
     if(howlInputFormat == null) {
@@ -78,34 +63,6 @@ public class HCatLoader extends LoadFunc
   }
 
   @Override
-  public Tuple getNext() throws IOException {
-    try {
-      HCatRecord hr =  (HCatRecord) (reader.nextKeyValue() ? reader.getCurrentValue() : null);
-      Tuple t = PigHCatUtil.transformToTuple(hr,outputSchema);
-      // TODO : we were discussing an iter interface, and also a LazyTuple
-      // change this when plans for that solidifies.
-      return t;
-    } catch (ExecException e) {
-      int errCode = 6018;
-      String errMsg = "Error while reading input";
-      throw new ExecException(errMsg, errCode,
-          PigException.REMOTE_ENVIRONMENT, e);
-    } catch (Exception eOther){
-      int errCode = 6018;
-      String errMsg = "Error converting read value to tuple";
-      throw new ExecException(errMsg, errCode,
-          PigException.REMOTE_ENVIRONMENT, eOther);
-    }
-
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
-    this.reader = reader;
-  }
-
-  @Override
   public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
     return location;
   }
@@ -207,12 +164,6 @@ public class HCatLoader extends LoadFunc
   }
 
   @Override
-  public ResourceStatistics getStatistics(String location, Job job) throws IOException {
-    // statistics not implemented currently
-    return null;
-  }
-
-  @Override
   public void setPartitionFilter(Expression partitionFilter) throws IOException {
     // convert the partition filter expression into a string expected by
     // howl and pass it in setLocation()
@@ -224,37 +175,6 @@ public class HCatLoader extends LoadFunc
         PARTITION_FILTER, partitionFilterString);
   }
 
-  @Override
-  public List<OperatorSet> getFeatures() {
-    return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
-  }
-
-  @Override
-  public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldsInfo) throws FrontendException {
-    // Store the required fields information in the UDFContext so that we
-    // can retrieve it later.
-    storeInUDFContext(signature, PRUNE_PROJECTION_INFO, requiredFieldsInfo);
-
-    // Howl will always prune columns based on what we ask of it - so the
-    // response is true
-    return new RequiredFieldResponse(true);
-  }
-
-  @Override
-  public void setUDFContextSignature(String signature) {
-    this.signature = signature;
-  }
-
-
-  // helper methods
-  private void storeInUDFContext(String signature, String key, Object value) {
-    UDFContext udfContext = UDFContext.getUDFContext();
-    Properties props = udfContext.getUDFProperties(
-        this.getClass(), new String[] {signature});
-    props.put(key, value);
-  }
-
-
   private String getPartitionFilterString() {
     if(partitionFilterString == null) {
       Properties props = UDFContext.getUDFContext().getUDFProperties(

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1099539&r1=1099538&r2=1099539&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Wed May  4 17:50:42 2011
@@ -19,85 +19,39 @@
 package org.apache.hcatalog.pig;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.DefaultHCatRecord;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
 import org.apache.hcatalog.mapreduce.HCatOutputCommitter;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 import org.apache.hcatalog.mapreduce.HCatTableInfo;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.StoreMetadata;
-import org.apache.pig.backend.BackendException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.Utils;
 
 /**
  * HowlStorer.
  *
  */
 
-public class HCatStorer extends StoreFunc implements StoreMetadata {
+public class HCatStorer extends HCatBaseStorer {
 
   /**
    *
    */
-  private static final String COMPUTED_OUTPUT_SCHEMA = "howl.output.schema";
-  private final Map<String,String> partitions;
-  private Schema pigSchema;
-  private RecordWriter<WritableComparable<?>, HCatRecord> writer;
-  private HCatSchema computedSchema;
-  private static final String PIG_SCHEMA = "howl.pig.store.schema";
-  private String sign;
 
   public HCatStorer(String partSpecs, String schema) throws ParseException, FrontendException {
-
-    partitions = new HashMap<String, String>();
-    if(partSpecs != null && !partSpecs.trim().isEmpty()){
-      String[] partKVPs = partSpecs.split(",");
-      for(String partKVP : partKVPs){
-        String[] partKV = partKVP.split("=");
-        if(partKV.length == 2) {
-          partitions.put(partKV[0].trim(), partKV[1].trim());
-        } else {
-          throw new FrontendException("Invalid partition column specification. "+partSpecs, PigHCatUtil.PIG_EXCEPTION_CODE);
-        }
-      }
-    }
-
-    if(schema != null) {
-      pigSchema = Utils.getSchemaFromString(schema);
-    }
-
+    super(partSpecs, schema);
   }
 
   public HCatStorer(String partSpecs) throws ParseException, FrontendException {
@@ -109,353 +63,11 @@ public class HCatStorer extends StoreFun
   }
 
   @Override
-  public void checkSchema(ResourceSchema resourceSchema) throws IOException {
-
-    /*  Schema provided by user and the schema computed by Pig
-     * at the time of calling store must match.
-     */
-    Schema runtimeSchema = Schema.getPigSchema(resourceSchema);
-    if(pigSchema != null){
-      if(! Schema.equals(runtimeSchema, pigSchema, false, true) ){
-        throw new FrontendException("Schema provided in store statement doesn't match with the Schema" +
-            "returned by Pig run-time. Schema provided in HowlStorer: "+pigSchema.toString()+ " Schema received from Pig runtime: "+runtimeSchema.toString(), PigHCatUtil.PIG_EXCEPTION_CODE);
-      }
-    } else {
-      pigSchema = runtimeSchema;
-    }
-    UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).setProperty(PIG_SCHEMA,ObjectSerializer.serialize(pigSchema));
-  }
-
-  /** Constructs HCatSchema from pigSchema. Passed tableSchema is the existing
-   * schema of the table in metastore.
-   */
-  private HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException{
-
-    List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
-    for(FieldSchema fSchema : pigSchema.getFields()){
-      byte type = fSchema.type;
-      HCatFieldSchema howlFSchema;
-
-      try {
-
-        // Find out if we need to throw away the tuple or not.
-        if(type == DataType.BAG && removeTupleFromBag(tableSchema, fSchema)){
-          List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
-          arrFields.add(getHowlFSFromPigFS(fSchema.schema.getField(0).schema.getField(0)));
-          howlFSchema = new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), null);
-      }
-      else{
-          howlFSchema = getHowlFSFromPigFS(fSchema);
-      }
-      fieldSchemas.add(howlFSchema);
-      } catch (HCatException he){
-          throw new FrontendException(he.getMessage(),PigHCatUtil.PIG_EXCEPTION_CODE,he);
-      }
-    }
-
-    return new HCatSchema(fieldSchemas);
-  }
-
-  private void validateUnNested(Schema innerSchema) throws FrontendException{
-
-    for(FieldSchema innerField : innerSchema.getFields()){
-      validateAlias(innerField.alias);
-      if(DataType.isComplex(innerField.type)) {
-        throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE);
-      }
-    }
-  }
-
-  private boolean removeTupleFromBag(HCatSchema tableSchema, FieldSchema bagFieldSchema) throws HCatException{
-
-    String colName = bagFieldSchema.alias;
-    for(HCatFieldSchema field : tableSchema.getFields()){
-      if(colName.equalsIgnoreCase(field.getName())){
-        return (field.getArrayElementSchema().get(0).getType() == Type.STRUCT) ? false : true;
-      }
-    }
-    // Column was not found in table schema. Its a new column
-    List<FieldSchema> tupSchema = bagFieldSchema.schema.getFields();
-    return (tupSchema.size() == 1 && tupSchema.get(0).schema == null) ? true : false;
-  }
-
-
-  private HCatFieldSchema getHowlFSFromPigFS(FieldSchema fSchema) throws FrontendException, HCatException{
-
-    byte type = fSchema.type;
-    switch(type){
-
-    case DataType.CHARARRAY:
-    case DataType.BIGCHARARRAY:
-      return new HCatFieldSchema(fSchema.alias, Type.STRING, null);
-
-    case DataType.INTEGER:
-      return new HCatFieldSchema(fSchema.alias, Type.INT, null);
-
-    case DataType.LONG:
-      return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null);
-
-    case DataType.FLOAT:
-      return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null);
-
-    case DataType.DOUBLE:
-      return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
-
-    case DataType.BAG:
-      Schema bagSchema = fSchema.schema;
-      List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
-      arrFields.add(getHowlFSFromPigFS(bagSchema.getField(0)));
-      return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
-
-    case DataType.TUPLE:
-      List<String> fieldNames = new ArrayList<String>();
-      List<HCatFieldSchema> howlFSs = new ArrayList<HCatFieldSchema>();
-      for( FieldSchema fieldSchema : fSchema.schema.getFields()){
-        fieldNames.add( fieldSchema.alias);
-        howlFSs.add(getHowlFSFromPigFS(fieldSchema));
-      }
-      return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(howlFSs), "");
-
-    case DataType.MAP:{
-      // Pig's schema contain no type information about map's keys and
-      // values. So, if its a new column assume <string,string> if its existing
-      // return whatever is contained in the existing column.
-      HCatFieldSchema mapField = getTableCol(fSchema.alias, howlTblSchema);
-      HCatFieldSchema valFS;
-      List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
-
-      if(mapField != null){
-        Type mapValType = mapField.getMapValueSchema().get(0).getType();
-
-        switch(mapValType){
-        case STRING:
-        case BIGINT:
-        case INT:
-        case FLOAT:
-        case DOUBLE:
-          valFS = new HCatFieldSchema(fSchema.alias, mapValType, null);
-          break;
-        default:
-          throw new FrontendException("Only pig primitive types are supported as map value types.", PigHCatUtil.PIG_EXCEPTION_CODE);
-        }
-        valFSList.add(valFS);
-        return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),"");
-      }
-
-      // Column not found in target table. Its a new column. Its schema is map<string,string>
-      valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, "");
-      valFSList.add(valFS);
-      return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),"");
-     }
-
-    default:
-      throw new FrontendException("Unsupported type: "+type+"  in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE);
-    }
-  }
-
-  @Override
   public OutputFormat getOutputFormat() throws IOException {
     return new HCatOutputFormat();
   }
 
   @Override
-  public void prepareToWrite(RecordWriter writer) throws IOException {
-    this.writer = writer;
-    computedSchema = (HCatSchema)ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).getProperty(COMPUTED_OUTPUT_SCHEMA));
-  }
-
-  @Override
-  public void putNext(Tuple tuple) throws IOException {
-
-    List<Object> outgoing = new ArrayList<Object>(tuple.size());
-
-    int i = 0;
-    for(HCatFieldSchema fSchema : computedSchema.getFields()){
-      outgoing.add(getJavaObj(tuple.get(i++), fSchema));
-    }
-    try {
-      writer.write(null, new DefaultHCatRecord(outgoing));
-    } catch (InterruptedException e) {
-      throw new BackendException("Error while writing tuple: "+tuple, PigHCatUtil.PIG_EXCEPTION_CODE, e);
-    }
-  }
-
-  private Object getJavaObj(Object pigObj, HCatFieldSchema howlFS) throws ExecException, HCatException{
-
-    // The real work-horse. Spend time and energy in this method if there is
-    // need to keep HowlStorer lean and go fast.
-    Type type = howlFS.getType();
-
-    switch(type){
-
-    case STRUCT:
-      // Unwrap the tuple.
-      return ((Tuple)pigObj).getAll();
-      //        Tuple innerTup = (Tuple)pigObj;
-      //
-      //      List<Object> innerList = new ArrayList<Object>(innerTup.size());
-      //      int i = 0;
-      //      for(HowlTypeInfo structFieldTypeInfo : typeInfo.getAllStructFieldTypeInfos()){
-      //        innerList.add(getJavaObj(innerTup.get(i++), structFieldTypeInfo));
-      //      }
-      //      return innerList;
-    case ARRAY:
-      // Unwrap the bag.
-      DataBag pigBag = (DataBag)pigObj;
-      HCatFieldSchema tupFS = howlFS.getArrayElementSchema().get(0);
-      boolean needTuple = tupFS.getType() == Type.STRUCT;
-      List<Object> bagContents = new ArrayList<Object>((int)pigBag.size());
-      Iterator<Tuple> bagItr = pigBag.iterator();
-
-      while(bagItr.hasNext()){
-        // If there is only one element in tuple contained in bag, we throw away the tuple.
-        bagContents.add(needTuple ? getJavaObj(bagItr.next(), tupFS) : bagItr.next().get(0));
-
-      }
-      return bagContents;
-
-      //    case MAP:
-      //     Map<String,DataByteArray> pigMap = (Map<String,DataByteArray>)pigObj;
-      //     Map<String,Long> typeMap = new HashMap<String, Long>();
-      //     for(Entry<String, DataByteArray> entry: pigMap.entrySet()){
-      //       typeMap.put(entry.getKey(), new Long(entry.getValue().toString()));
-      //     }
-      //     return typeMap;
-    default:
-      return pigObj;
-    }
-  }
-
-  @Override
-  public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
-
-    // Need to necessarily override this method since default impl assumes HDFS
-    // based location string.
-    return location;
-  }
-
-  @Override
-  public void setStoreFuncUDFContextSignature(String signature) {
-    sign = signature;
-  }
-
-
-  private void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throws FrontendException, HCatException{
-
-    // Iterate through all the elements in Pig Schema and do validations as
-    // dictated by semantics, consult HCatSchema of table when need be.
-
-    for(FieldSchema pigField : pigSchema.getFields()){
-      byte type = pigField.type;
-      String alias = pigField.alias;
-      validateAlias(alias);
-      HCatFieldSchema howlField = getTableCol(alias, tblSchema);
-
-      if(DataType.isComplex(type)){
-        switch(type){
-
-        case DataType.MAP:
-          if(howlField != null){
-            if(howlField.getMapKeyType() != Type.STRING){
-              throw new FrontendException("Key Type of map must be String "+howlField,  PigHCatUtil.PIG_EXCEPTION_CODE);
-            }
-            if(howlField.getMapValueSchema().get(0).isComplex()){
-              throw new FrontendException("Value type of map cannot be complex" + howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
-            }
-          }
-          break;
-
-        case DataType.BAG:
-          // Only map is allowed as complex type in tuples inside bag.
-          for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){
-            if(innerField.type == DataType.BAG || innerField.type == DataType.TUPLE) {
-              throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE);
-            }
-            validateAlias(innerField.alias);
-          }
-          if(howlField != null){
-            // Do the same validation for HCatSchema.
-            HCatFieldSchema arrayFieldScehma = howlField.getArrayElementSchema().get(0);
-            Type hType = arrayFieldScehma.getType();
-            if(hType == Type.STRUCT){
-              for(HCatFieldSchema structFieldInBag : arrayFieldScehma.getStructSubSchema().getFields()){
-                if(structFieldInBag.getType() == Type.STRUCT || structFieldInBag.getType() == Type.ARRAY){
-                  throw new FrontendException("Nested Complex types not allowed "+ howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
-                }
-              }
-            }
-            if(hType == Type.MAP){
-              if(arrayFieldScehma.getMapKeyType() != Type.STRING){
-                throw new FrontendException("Key Type of map must be String "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
-              }
-              if(arrayFieldScehma.getMapValueSchema().get(0).isComplex()){
-                throw new FrontendException("Value type of map cannot be complex "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
-              }
-            }
-            if(hType == Type.ARRAY) {
-              throw new FrontendException("Arrays cannot contain array within it. "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
-            }
-          }
-          break;
-
-        case DataType.TUPLE:
-          validateUnNested(pigField.schema);
-          if(howlField != null){
-            for(HCatFieldSchema structFieldSchema : howlField.getStructSubSchema().getFields()){
-              if(structFieldSchema.isComplex()){
-                throw new FrontendException("Nested Complex types are not allowed."+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
-              }
-            }
-          }
-          break;
-
-        default:
-          throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
-        }
-      }
-    }
-
-    for(HCatFieldSchema howlField : tblSchema.getFields()){
-
-      // We dont do type promotion/demotion.
-      Type hType = howlField.getType();
-      switch(hType){
-      case SMALLINT:
-      case TINYINT:
-      case BOOLEAN:
-        throw new FrontendException("Incompatible type found in howl table schema: "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
-      }
-    }
-  }
-
-  private void validateAlias(String alias) throws FrontendException{
-    if(alias == null) {
-      throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE);
-    }
-    if(alias.matches(".*[A-Z]+.*")) {
-      throw new FrontendException("Column names should all be in lowercase. Invalid name found: "+alias, PigHCatUtil.PIG_EXCEPTION_CODE);
-    }
-  }
-
-  // Finds column by name in HCatSchema, if not found returns null.
-  private HCatFieldSchema getTableCol(String alias, HCatSchema tblSchema){
-
-    for(HCatFieldSchema howlField : tblSchema.getFields()){
-      if(howlField.getName().equalsIgnoreCase(alias)){
-        return howlField;
-      }
-    }
-    // Its a new column
-    return null;
-  }
-  HCatSchema howlTblSchema;
-
-  @Override
-  public void cleanupOnFailure(String location, Job job) throws IOException {
-    // No-op.
-  }
-
-  @Override
   public void setStoreLocation(String location, Job job) throws IOException {
 
     Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
@@ -489,7 +101,7 @@ public class HCatStorer extends StoreFun
           // information passed to HCatOutputFormat was not right
           throw new PigException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
       }
-      howlTblSchema = HCatOutputFormat.getTableSchema(job);
+      HCatSchema howlTblSchema = HCatOutputFormat.getTableSchema(job);
       try{
         doSchemaValidations(pigSchema, howlTblSchema);
       } catch(HCatException he){
@@ -528,8 +140,4 @@ public class HCatStorer extends StoreFun
       new HCatOutputCommitter(null).cleanupJob(job);
     }
   }
-
-  @Override
-  public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException {
-  }
 }

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java Wed May  4 17:50:42 2011
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.TestHCatEximInputFormat.TestImport.EmpDetails;
+
+/**
+ *
+ * TestHCatEximInputFormat. tests primarily HCatEximInputFormat but
+ * also HCatEximOutputFormat.
+ *
+ */
+public class TestHCatEximInputFormat extends TestCase {
+
+  public static class TestExport extends
+      org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, HCatRecord> {
+
+    private HCatSchema recordSchema;
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      super.setup(context);
+      recordSchema = HCatEximOutputFormat.getTableSchema(context);
+    }
+
+    @Override
+    public void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      String[] cols = value.toString().split(",");
+      HCatRecord record = new DefaultHCatRecord(recordSchema.size());
+      record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0]));
+      record.setString("emp_name", recordSchema, cols[1]);
+      record.setString("emp_dob", recordSchema, cols[2]);
+      record.setString("emp_sex", recordSchema, cols[3]);
+      context.write(key, record);
+    }
+  }
+
+  public static class TestImport extends
+      org.apache.hadoop.mapreduce.Mapper<
+      org.apache.hadoop.io.LongWritable, HCatRecord,
+      org.apache.hadoop.io.Text,
+      org.apache.hadoop.io.Text> {
+
+    private HCatSchema recordSchema;
+
+    public static class EmpDetails {
+      public String emp_name;
+      public String emp_dob;
+      public String emp_sex;
+      public String emp_country;
+      public String emp_state;
+    }
+
+    public static Map<Integer, EmpDetails> empRecords = new TreeMap<Integer, EmpDetails>();
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      super.setup(context);
+      try {
+        recordSchema = HCatBaseInputFormat.getOutputSchema(context);
+      } catch (Exception e) {
+        throw new IOException("Error getting outputschema from job configuration", e);
+      }
+      System.out.println("RecordSchema : " + recordSchema.toString());
+    }
+
+    @Override
+    public void map(LongWritable key, HCatRecord value, Context context)
+        throws IOException, InterruptedException {
+      EmpDetails empDetails = new EmpDetails();
+      Integer emp_id = value.getInteger("emp_id", recordSchema);
+      String emp_name = value.getString("emp_name", recordSchema);
+      empDetails.emp_name = emp_name;
+      if (recordSchema.getPosition("emp_dob") != null) {
+        empDetails.emp_dob = value.getString("emp_dob", recordSchema);
+      }
+      if (recordSchema.getPosition("emp_sex") != null) {
+        empDetails.emp_sex = value.getString("emp_sex", recordSchema);
+      }
+      if (recordSchema.getPosition("emp_country") != null) {
+        empDetails.emp_country = value.getString("emp_country", recordSchema);
+      }
+      if (recordSchema.getPosition("emp_state") != null) {
+        empDetails.emp_state = value.getString("emp_state", recordSchema);
+      }
+      empRecords.put(emp_id, empDetails);
+    }
+  }
+
+  private static final String dbName = "hcatEximOutputFormatTestDB";
+  private static final String tblName = "hcatEximOutputFormatTestTable";
+  Configuration conf;
+  Job job;
+  List<HCatFieldSchema> columns;
+  HCatSchema schema;
+  FileSystem fs;
+  Path inputLocation;
+  Path outputLocation;
+  private HCatSchema partSchema;
+
+
+  @Override
+  protected void setUp() throws Exception {
+    System.out.println("Setup started");
+    super.setUp();
+    conf = new Configuration();
+    job = new Job(conf, "test eximinputformat");
+    columns = new ArrayList<HCatFieldSchema>();
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+        Constants.INT_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
+        Constants.STRING_TYPE_NAME, "")));
+    schema = new HCatSchema(columns);
+
+    fs = new LocalFileSystem();
+    fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+    inputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports");
+    outputLocation = new Path(fs.getWorkingDirectory(), "tmp/data");
+
+    job.setJarByClass(this.getClass());
+    job.setNumReduceTasks(0);
+    System.out.println("Setup done");
+  }
+
+  private void setupMRExport(String[] records) throws IOException {
+    if (fs.exists(outputLocation)) {
+      fs.delete(outputLocation, true);
+    }
+    FSDataOutputStream ds = fs.create(outputLocation, true);
+    for (String record : records) {
+      ds.writeBytes(record);
+    }
+    ds.close();
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputFormatClass(HCatEximOutputFormat.class);
+    TextInputFormat.setInputPaths(job, outputLocation);
+    job.setMapperClass(TestExport.class);
+  }
+
+  private void setupMRImport() throws IOException {
+    if (fs.exists(outputLocation)) {
+      fs.delete(outputLocation, true);
+    }
+    job.setInputFormatClass(HCatEximInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    TextOutputFormat.setOutputPath(job, outputLocation);
+    job.setMapperClass(TestImport.class);
+    TestImport.empRecords.clear();
+  }
+
+
+  @Override
+  protected void tearDown() throws Exception {
+    System.out.println("Teardown started");
+    super.tearDown();
+    // fs.delete(inputLocation, true);
+    // fs.delete(outputLocation, true);
+    System.out.println("Teardown done");
+  }
+
+
+  private void runNonPartExport() throws IOException, InterruptedException, ClassNotFoundException {
+    if (fs.exists(inputLocation)) {
+      fs.delete(inputLocation, true);
+    }
+    setupMRExport(new String[] {
+        "237,Krishna,01/01/1990,M,IN,TN\n",
+        "238,Kalpana,01/01/2000,F,IN,KA\n",
+        "239,Satya,01/01/2001,M,US,TN\n",
+        "240,Kavya,01/01/2002,F,US,KA\n"
+
+    });
+    HCatEximOutputFormat.setOutput(
+        job,
+        dbName,
+        tblName,
+        inputLocation.toString(),
+        null,
+        null,
+        schema);
+
+    job.waitForCompletion(true);
+    HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+    committer.cleanupJob(job);
+  }
+
+  private void runPartExport(String record, String country, String state) throws IOException, InterruptedException, ClassNotFoundException {
+    setupMRExport(new String[] {record});
+    List<String> partValues = new ArrayList<String>(2);
+    partValues.add(country);
+    partValues.add(state);
+    HCatEximOutputFormat.setOutput(
+        job,
+        dbName,
+        tblName,
+        inputLocation.toString(),
+        partSchema ,
+        partValues ,
+        schema);
+
+    job.waitForCompletion(true);
+    HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+    committer.cleanupJob(job);
+  }
+
+  public void testNonPart() throws Exception {
+    try {
+      runNonPartExport();
+      setUp();
+      setupMRImport();
+      HCatEximInputFormat.setInput(job, "tmp/exports", null);
+      job.waitForCompletion(true);
+
+      assertEquals(4, TestImport.empRecords.size());
+      assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", null, null);
+      assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", null, null);
+      assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", null, null);
+      assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", null, null);
+    } catch (Exception e) {
+      System.out.println("Test failed with " + e.getMessage());
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  public void testNonPartProjection() throws Exception {
+    try {
+
+      runNonPartExport();
+      setUp();
+      setupMRImport();
+      HCatEximInputFormat.setInput(job, "tmp/exports", null);
+
+      List<HCatFieldSchema> readColumns = new ArrayList<HCatFieldSchema>();
+      readColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+            Constants.INT_TYPE_NAME, "")));
+      readColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+            Constants.STRING_TYPE_NAME, "")));
+
+      HCatEximInputFormat.setOutputSchema(job, new HCatSchema(readColumns));
+      job.waitForCompletion(true);
+
+      assertEquals(4, TestImport.empRecords.size());
+      assertEmpDetail(TestImport.empRecords.get(237), "Krishna", null, null, null, null);
+      assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", null, null, null, null);
+      assertEmpDetail(TestImport.empRecords.get(239), "Satya", null, null, null, null);
+      assertEmpDetail(TestImport.empRecords.get(240), "Kavya", null, null, null, null);
+    } catch (Exception e) {
+      System.out.println("Test failed with " + e.getMessage());
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  public void testPart() throws Exception {
+    try {
+      if (fs.exists(inputLocation)) {
+        fs.delete(inputLocation, true);
+      }
+
+      List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2);
+      partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, ""));
+      partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, ""));
+      partSchema = new HCatSchema(partKeys);
+
+      runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn");
+      setUp();
+      runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka");
+      setUp();
+      runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn");
+      setUp();
+      runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka");
+
+      setUp();
+      setupMRImport();
+      HCatEximInputFormat.setInput(job, "tmp/exports", null);
+      job.waitForCompletion(true);
+
+      assertEquals(4, TestImport.empRecords.size());
+      assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", "in", "tn");
+      assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka");
+      assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", "us", "tn");
+      assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka");
+    } catch (Exception e) {
+      System.out.println("Test failed with " + e.getMessage());
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  public void testPartWithPartCols() throws Exception {
+    try {
+      if (fs.exists(inputLocation)) {
+        fs.delete(inputLocation, true);
+      }
+
+      List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2);
+      partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, ""));
+      partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, ""));
+      partSchema = new HCatSchema(partKeys);
+
+      runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn");
+      setUp();
+      runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka");
+      setUp();
+      runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn");
+      setUp();
+      runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka");
+
+      setUp();
+      setupMRImport();
+      HCatEximInputFormat.setInput(job, "tmp/exports", null);
+
+      List<HCatFieldSchema> colsPlusPartKeys = new ArrayList<HCatFieldSchema>();
+      colsPlusPartKeys.addAll(columns);
+      colsPlusPartKeys.addAll(partKeys);
+
+      HCatBaseInputFormat.setOutputSchema(job, new HCatSchema(colsPlusPartKeys));
+      job.waitForCompletion(true);
+
+      assertEquals(4, TestImport.empRecords.size());
+      assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", "in", "tn");
+      assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka");
+      assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", "us", "tn");
+      assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka");
+    } catch (Exception e) {
+      System.out.println("Test failed with " + e.getMessage());
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+
+  public void testPartSelection() throws Exception {
+    try {
+      if (fs.exists(inputLocation)) {
+        fs.delete(inputLocation, true);
+      }
+
+      List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>(2);
+      partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, ""));
+      partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, ""));
+      partSchema = new HCatSchema(partKeys);
+
+      runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn");
+      setUp();
+      runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka");
+      setUp();
+      runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn");
+      setUp();
+      runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka");
+
+      setUp();
+      setupMRImport();
+      Map<String, String> filter = new TreeMap<String, String>();
+      filter.put("emp_state", "ka");
+      HCatEximInputFormat.setInput(job, "tmp/exports", filter);
+      job.waitForCompletion(true);
+
+      assertEquals(2, TestImport.empRecords.size());
+      assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka");
+      assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka");
+    } catch (Exception e) {
+      System.out.println("Test failed with " + e.getMessage());
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+
+  private void assertEmpDetail(EmpDetails empDetails, String name, String dob, String mf, String country, String state) {
+    assertNotNull(empDetails);
+    assertEquals(name, empDetails.emp_name);
+    assertEquals(dob, empDetails.emp_dob);
+    assertEquals(mf, empDetails.emp_sex);
+    assertEquals(country, empDetails.emp_country);
+    assertEquals(state, empDetails.emp_state);
+  }
+
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java Wed May  4 17:50:42 2011
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+
+/**
+ *
+ * TestHCatEximOutputFormat. Some basic testing here. More testing done via
+ * TestHCatEximInputFormat
+ *
+ */
+public class TestHCatEximOutputFormat extends TestCase {
+
+  public static class TestMap extends
+      Mapper<LongWritable, Text, LongWritable, HCatRecord> {
+
+    private HCatSchema recordSchema;
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      super.setup(context);
+      recordSchema = HCatEximOutputFormat.getTableSchema(context);
+      System.out.println("TestMap/setup called");
+    }
+
+    @Override
+    public void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      String[] cols = value.toString().split(",");
+      HCatRecord record = new DefaultHCatRecord(recordSchema.size());
+      System.out.println("TestMap/map called. Cols[0]:" + cols[0]);
+      System.out.println("TestMap/map called. Cols[1]:" + cols[1]);
+      System.out.println("TestMap/map called. Cols[2]:" + cols[2]);
+      System.out.println("TestMap/map called. Cols[3]:" + cols[3]);
+      record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0]));
+      record.setString("emp_name", recordSchema, cols[1]);
+      record.setString("emp_dob", recordSchema, cols[2]);
+      record.setString("emp_sex", recordSchema, cols[3]);
+      context.write(key, record);
+    }
+  }
+
+
+  private static final String dbName = "hcatEximOutputFormatTestDB";
+  private static final String tblName = "hcatEximOutputFormatTestTable";
+  Configuration conf;
+  Job job;
+  List<HCatFieldSchema> columns;
+  HCatSchema schema;
+  FileSystem fs;
+  Path outputLocation;
+  Path dataLocation;
+
+  public void testNonPart() throws Exception {
+    try {
+      HCatEximOutputFormat.setOutput(
+          job,
+          dbName,
+          tblName,
+          outputLocation.toString(),
+          null,
+          null,
+          schema);
+
+      job.waitForCompletion(true);
+      HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+      committer.cleanupJob(job);
+
+      Path metadataPath = new Path(outputLocation, "_metadata");
+      Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath);
+      Table table = rv.getKey();
+      List<Partition> partitions = rv.getValue();
+
+      assertEquals(dbName, table.getDbName());
+      assertEquals(tblName, table.getTableName());
+      assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
+          HCatUtil.getFieldSchemaList(columns)));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+          table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+          table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+      assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+          table.getSd().getInputFormat());
+      assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+          table.getSd().getOutputFormat());
+      assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+          table.getSd().getSerdeInfo().getSerializationLib());
+      assertEquals(0, table.getPartitionKeys().size());
+
+      assertEquals(0, partitions.size());
+    } catch (Exception e) {
+      System.out.println("Test failed with " + e.getMessage());
+      e.printStackTrace();
+      throw e;
+    }
+
+  }
+
+  public void testPart() throws Exception {
+    try {
+      List<HCatFieldSchema> partKeys = new ArrayList<HCatFieldSchema>();
+      partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_country",
+          Constants.STRING_TYPE_NAME, "")));
+      partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_state",
+          Constants.STRING_TYPE_NAME, "")));
+      HCatSchema partitionSchema = new HCatSchema(partKeys);
+
+      List<String> partitionVals = new ArrayList<String>();
+      partitionVals.add("IN");
+      partitionVals.add("TN");
+
+      HCatEximOutputFormat.setOutput(
+          job,
+          dbName,
+          tblName,
+          outputLocation.toString(),
+          partitionSchema,
+          partitionVals,
+          schema);
+
+      job.waitForCompletion(true);
+      HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null);
+      committer.cleanupJob(job);
+      Path metadataPath = new Path(outputLocation, "_metadata");
+      Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath);
+      Table table = rv.getKey();
+      List<Partition> partitions = rv.getValue();
+
+      assertEquals(dbName, table.getDbName());
+      assertEquals(tblName, table.getTableName());
+      assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
+          HCatUtil.getFieldSchemaList(columns)));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+          table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+          table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+      assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+          table.getSd().getInputFormat());
+      assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+          table.getSd().getOutputFormat());
+      assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+          table.getSd().getSerdeInfo().getSerializationLib());
+      assertEquals(2, table.getPartitionKeys().size());
+      List<FieldSchema> partSchema = table.getPartitionKeys();
+      assertEquals("emp_country", partSchema.get(0).getName());
+      assertEquals("emp_state", partSchema.get(1).getName());
+
+      assertEquals(1, partitions.size());
+      Partition partition = partitions.get(0);
+      assertEquals("IN", partition.getValues().get(0));
+      assertEquals("TN", partition.getValues().get(1));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+          partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+          partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+    } catch (Exception e) {
+      System.out.println("Test failed with " + e.getMessage());
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    System.out.println("Setup started");
+    super.setUp();
+    conf = new Configuration();
+    job = new Job(conf, "test eximoutputformat");
+    columns = new ArrayList<HCatFieldSchema>();
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+        Constants.INT_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
+        Constants.STRING_TYPE_NAME, "")));
+    schema = new HCatSchema(columns);
+
+    fs = new LocalFileSystem();
+    fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
+    outputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports");
+    if (fs.exists(outputLocation)) {
+      fs.delete(outputLocation, true);
+    }
+    dataLocation = new Path(fs.getWorkingDirectory(), "tmp/data");
+    if (fs.exists(dataLocation)) {
+      fs.delete(dataLocation, true);
+    }
+    FSDataOutputStream ds = fs.create(dataLocation, true);
+    ds.writeBytes("237,Krishna,01/01/1990,M,IN,TN\n");
+    ds.writeBytes("238,Kalpana,01/01/2000,F,IN,KA\n");
+    ds.writeBytes("239,Satya,01/01/2001,M,US,TN\n");
+    ds.writeBytes("240,Kavya,01/01/2002,F,US,KA\n");
+    ds.close();
+
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputFormatClass(HCatEximOutputFormat.class);
+    TextInputFormat.setInputPaths(job, dataLocation);
+    job.setJarByClass(this.getClass());
+    job.setMapperClass(TestMap.class);
+    job.setNumReduceTasks(0);
+    System.out.println("Setup done");
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    System.out.println("Teardown started");
+    super.tearDown();
+    fs.delete(dataLocation, true);
+    fs.delete(outputLocation, true);
+    System.out.println("Teardown done");
+  }
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java Wed May  4 17:50:42 2011
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.hcatalog.MiniCluster;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ *
+ * TestHCatEximLoader. Assumes Exim storer is working well
+ *
+ */
+public class TestHCatEximLoader extends TestCase {
+
+  private static final String NONPART_TABLE = "junit_unparted";
+  private static final String PARTITIONED_TABLE = "junit_parted";
+  private static MiniCluster cluster = MiniCluster.buildCluster();
+
+  private static final String dataLocation = "/tmp/data";
+  private static String fqdataLocation;
+  private static final String exportLocation = "/tmp/export";
+  private static String fqexportLocation;
+
+  private static Properties props;
+
+  private void cleanup() throws IOException {
+    MiniCluster.deleteFile(cluster, dataLocation);
+    MiniCluster.deleteFile(cluster, exportLocation);
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    props = new Properties();
+    props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+    System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName()
+        + ", fs.default.name : " + props.getProperty("fs.default.name"));
+    fqdataLocation = cluster.getProperties().getProperty("fs.default.name") + dataLocation;
+    fqexportLocation = cluster.getProperties().getProperty("fs.default.name") + exportLocation;
+    System.out.println("FQ Data Location :" + fqdataLocation);
+    System.out.println("FQ Export Location :" + fqexportLocation);
+    cleanup();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    cleanup();
+  }
+
+  private void populateDataFile() throws IOException {
+    MiniCluster.deleteFile(cluster, dataLocation);
+    String[] input = new String[] {
+        "237,Krishna,01/01/1990,M,IN,TN",
+        "238,Kalpana,01/01/2000,F,IN,KA",
+        "239,Satya,01/01/2001,M,US,TN",
+        "240,Kavya,01/01/2002,F,US,KA"
+    };
+    MiniCluster.createInputFile(cluster, dataLocation, input);
+  }
+
+  private static class EmpDetail {
+    String name;
+    String dob;
+    String mf;
+    String country;
+    String state;
+  }
+
+  private void assertEmpDetail(Tuple t, Map<Integer, EmpDetail> eds) throws ExecException {
+    assertNotNull(t);
+    assertEquals(6, t.size());
+
+    assertTrue(t.get(0).getClass() == Integer.class);
+    assertTrue(t.get(1).getClass() == String.class);
+    assertTrue(t.get(2).getClass() == String.class);
+    assertTrue(t.get(3).getClass() == String.class);
+    assertTrue(t.get(4).getClass() == String.class);
+    assertTrue(t.get(5).getClass() == String.class);
+
+    EmpDetail ed = eds.remove(t.get(0));
+    assertNotNull(ed);
+
+    assertEquals(ed.name, t.get(1));
+    assertEquals(ed.dob, t.get(2));
+    assertEquals(ed.mf, t.get(3));
+    assertEquals(ed.country, t.get(4));
+    assertEquals(ed.state, t.get(5));
+  }
+
+  private void addEmpDetail(Map<Integer, EmpDetail> empDetails, int id, String name,
+      String dob, String mf, String country, String state) {
+    EmpDetail ed = new EmpDetail();
+    ed.name = name;
+    ed.dob = dob;
+    ed.mf = mf;
+    ed.country = country;
+    ed.state = state;
+    empDetails.put(id, ed);
+  }
+
+
+
+  private void assertEmpDetail(Tuple t, Integer id, String name, String dob, String mf)
+      throws ExecException {
+    assertNotNull(t);
+    assertEquals(4, t.size());
+    assertTrue(t.get(0).getClass() == Integer.class);
+    assertTrue(t.get(1).getClass() == String.class);
+    assertTrue(t.get(2).getClass() == String.class);
+    assertTrue(t.get(3).getClass() == String.class);
+
+    assertEquals(id, t.get(0));
+    assertEquals(name, t.get(1));
+    assertEquals(dob, t.get(2));
+    assertEquals(mf, t.get(3));
+  }
+
+  private void assertEmpDetail(Tuple t, String mf, String name)
+      throws ExecException {
+    assertNotNull(t);
+    assertEquals(2, t.size());
+    assertTrue(t.get(0).getClass() == String.class);
+    assertTrue(t.get(1).getClass() == String.class);
+
+    assertEquals(mf, t.get(0));
+    assertEquals(name, t.get(1));
+  }
+
+
+
+  public void testLoadNonPartTable() throws Exception {
+    populateDataFile();
+    {
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+      server.setBatchOn();
+      server
+          .registerQuery("A = load '"
+              + fqdataLocation
+              + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
+      server.registerQuery("store A into '" + NONPART_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');");
+      server.executeBatch();
+    }
+    {
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+
+      server
+          .registerQuery("A = load '"
+              + fqexportLocation
+              + "' using org.apache.hcatalog.pig.HCatEximLoader();");
+      Iterator<Tuple> XIter = server.openIterator("A");
+      assertTrue(XIter.hasNext());
+      Tuple t = XIter.next();
+      assertEmpDetail(t, 237, "Krishna", "01/01/1990", "M");
+      assertTrue(XIter.hasNext());
+      t = XIter.next();
+      assertEmpDetail(t, 238, "Kalpana", "01/01/2000", "F");
+      assertTrue(XIter.hasNext());
+      t = XIter.next();
+      assertEmpDetail(t, 239, "Satya", "01/01/2001", "M");
+      assertTrue(XIter.hasNext());
+      t = XIter.next();
+      assertEmpDetail(t, 240, "Kavya", "01/01/2002", "F");
+      assertFalse(XIter.hasNext());
+    }
+  }
+
+  public void testLoadNonPartProjection() throws Exception {
+    populateDataFile();
+    {
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+      server.setBatchOn();
+      server
+          .registerQuery("A = load '"
+              + fqdataLocation
+              + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
+      server.registerQuery("store A into '" + NONPART_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');");
+      server.executeBatch();
+    }
+    {
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+
+      server
+          .registerQuery("A = load '"
+              + fqexportLocation
+              + "' using org.apache.hcatalog.pig.HCatEximLoader();");
+      server.registerQuery("B = foreach A generate emp_sex, emp_name;");
+
+      Iterator<Tuple> XIter = server.openIterator("B");
+      assertTrue(XIter.hasNext());
+      Tuple t = XIter.next();
+      assertEmpDetail(t, "M", "Krishna");
+      assertTrue(XIter.hasNext());
+      t = XIter.next();
+      assertEmpDetail(t, "F", "Kalpana");
+      assertTrue(XIter.hasNext());
+      t = XIter.next();
+      assertEmpDetail(t, "M", "Satya");
+      assertTrue(XIter.hasNext());
+      t = XIter.next();
+      assertEmpDetail(t, "F", "Kavya");
+      assertFalse(XIter.hasNext());
+    }
+  }
+
+
+  public void testLoadMultiPartTable() throws Exception {
+    {
+      populateDataFile();
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+      server.setBatchOn();
+      server
+          .registerQuery("A = load '"
+              + fqdataLocation +
+              "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"
+          );
+      server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';");
+      server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';");
+      server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';");
+      server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';");
+      server.registerQuery("store INTN into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=in,emp_state=tn');");
+      server.registerQuery("store INKA into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=in,emp_state=ka');");
+      server.registerQuery("store USTN into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=us,emp_state=tn');");
+      server.registerQuery("store USKA into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=us,emp_state=ka');");
+      server.executeBatch();
+    }
+    {
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+
+      server
+          .registerQuery("A = load '"
+              + fqexportLocation
+              + "' using org.apache.hcatalog.pig.HCatEximLoader() "
+              //+ "as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);");
+              + ";");
+
+      Iterator<Tuple> XIter = server.openIterator("A");
+
+      Map<Integer, EmpDetail> empDetails = new TreeMap<Integer, EmpDetail>();
+      addEmpDetail(empDetails, 237, "Krishna", "01/01/1990", "M", "in", "tn");
+      addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka");
+      addEmpDetail(empDetails, 239, "Satya", "01/01/2001", "M", "us", "tn");
+      addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka");
+
+      while(XIter.hasNext()) {
+        Tuple t = XIter.next();
+        assertNotSame(0, empDetails.size());
+        assertEmpDetail(t, empDetails);
+      }
+      assertEquals(0, empDetails.size());
+    }
+  }
+
+  public void testLoadMultiPartFilter() throws Exception {
+    {
+      populateDataFile();
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+      server.setBatchOn();
+      server
+          .registerQuery("A = load '"
+              + fqdataLocation +
+              "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"
+          );
+      server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';");
+      server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';");
+      server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';");
+      server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';");
+      server.registerQuery("store INTN into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=in,emp_state=tn');");
+      server.registerQuery("store INKA into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=in,emp_state=ka');");
+      server.registerQuery("store USTN into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=us,emp_state=tn');");
+      server.registerQuery("store USKA into '" + PARTITIONED_TABLE
+          + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation +
+          "', 'emp_country=us,emp_state=ka');");
+      server.executeBatch();
+    }
+    {
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      UDFContext.getUDFContext().setClientSystemProps();
+
+      server
+          .registerQuery("A = load '"
+              + fqexportLocation
+              + "' using org.apache.hcatalog.pig.HCatEximLoader() "
+              + ";");
+      server.registerQuery("B = filter A by emp_state == 'ka';");
+
+      Iterator<Tuple> XIter = server.openIterator("B");
+
+      Map<Integer, EmpDetail> empDetails = new TreeMap<Integer, EmpDetail>();
+      addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka");
+      addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka");
+
+      while(XIter.hasNext()) {
+        Tuple t = XIter.next();
+        assertNotSame(0, empDetails.size());
+        assertEmpDetail(t, empDetails);
+      }
+      assertEquals(0, empDetails.size());
+    }
+  }
+
+
+}