You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ha...@apache.org on 2011/11/01 21:14:53 UTC

svn commit: r1196278 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/cli/SemanticAnalysis/ src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/ src/java/org/apache/hcatalog/pig/d...

Author: hashutosh
Date: Tue Nov  1 21:14:50 2011
New Revision: 1196278

URL: http://svn.apache.org/viewvc?rev=1196278&view=rev
Log:
HCATALOG-121 : TextStorageOutputDriver for Pig

Added:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorage.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/AlterTableFileFormatHook.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Nov  1 21:14:50 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-121. TextStorageOutputDriver for Pig (daijyc via hashutosh)  
+
   HCAT-129. HBase Storage Driver Test doesn't use unique test dir for warehouse (toffer via khorgath)
 
   HCAT-109. HBase Storage Handler for HCatalog (avandana via khorgath)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/AlterTableFileFormatHook.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/AlterTableFileFormatHook.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/AlterTableFileFormatHook.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/AlterTableFileFormatHook.java Tue Nov  1 21:14:50 2011
@@ -40,10 +40,11 @@ import org.apache.hadoop.hive.ql.plan.DD
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.rcfile.RCFileInputDriver;
 import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+import org.apache.pig.builtin.PigStorage;
 
 public class AlterTableFileFormatHook extends AbstractSemanticAnalyzerHook {
 
-  private String inDriver, outDriver, tableName;
+  private String inDriver, outDriver, tableName, loader, storer;
 
   @Override
   public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast) throws SemanticException {
@@ -65,8 +66,13 @@ public class AlterTableFileFormatHook ex
       "You may specify it through INPUT/OUTPUT storage drivers.");
 
     case HiveParser.TOK_TBLTEXTFILE:
-      throw new SemanticException("Operation not supported. HCatalog doesn't support Text File by default yet. " +
-      "You may specify it through INPUT/OUTPUT storage drivers.");
+        inputFormat      = org.apache.hadoop.mapred.TextInputFormat.class.getName();
+        outputFormat     = org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat.class.getName();
+        inDriver  = org.apache.hcatalog.pig.drivers.LoadFuncBasedInputDriver.class.getName();
+        outDriver = org.apache.hcatalog.pig.drivers.StoreFuncBasedOutputDriver.class.getName();
+        loader = PigStorage.class.getName();
+        storer = PigStorage.class.getName();
+        break;
 
     case HiveParser.TOK_TBLRCFILE:
       inputFormat = RCFileInputFormat.class.getName();
@@ -91,6 +97,14 @@ public class AlterTableFileFormatHook ex
     hcatProps.put(HCatConstants.HCAT_ISD_CLASS, inDriver);
     hcatProps.put(HCatConstants.HCAT_OSD_CLASS, outDriver);
 
+    if (loader!=null) {
+        hcatProps.put(HCatConstants.HCAT_PIG_LOADER, loader);
+    }
+    
+    if (storer!=null) {
+        hcatProps.put(HCatConstants.HCAT_PIG_STORER, storer);
+    }
+    
     try {
       Hive db = context.getHive();
       Table tbl = db.getTable(tableName);

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java Tue Nov  1 21:14:50 2011
@@ -47,13 +47,16 @@ import org.apache.hcatalog.common.AuthUt
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.pig.drivers.LoadFuncBasedInputDriver;
+import org.apache.hcatalog.pig.drivers.StoreFuncBasedOutputDriver;
 import org.apache.hcatalog.rcfile.RCFileInputDriver;
 import org.apache.hcatalog.rcfile.RCFileOutputDriver;
 import org.apache.hcatalog.storagehandler.HCatStorageHandler;
+import org.apache.pig.builtin.PigStorage;
 
 final class CreateTableHook extends AbstractSemanticAnalyzerHook {
 
-    private String inStorageDriver, outStorageDriver, tableName;
+    private String inStorageDriver, outStorageDriver, tableName, loader, storer;
 
     @Override
     public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context,
@@ -97,10 +100,14 @@ final class CreateTableHook extends Abst
                              + "You may specify it through INPUT/OUTPUT storage drivers.");
 
                 case HiveParser.TOK_TBLTEXTFILE:
-                    throw new SemanticException(
-                            "Operation not supported. HCatalog doesn't support " +
-                            "Text File by default yet. "
-                            + "You may specify it through INPUT/OUTPUT storage drivers.");
+                    inputFormat      = org.apache.hadoop.mapred.TextInputFormat.class.getName();
+                    outputFormat     = org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat.class.getName();
+                    inStorageDriver  = org.apache.hcatalog.pig.drivers.LoadFuncBasedInputDriver.class.getName();
+                    outStorageDriver = org.apache.hcatalog.pig.drivers.StoreFuncBasedOutputDriver.class.getName();
+                    loader = PigStorage.class.getName();
+                    storer = PigStorage.class.getName();
+
+                    break;
 
                 case HiveParser.TOK_LIKETABLE:
 
@@ -255,6 +262,14 @@ final class CreateTableHook extends Abst
             }
 
         }
+        
+        if (loader!=null) {
+            tblProps.put(HCatConstants.HCAT_PIG_LOADER, loader);
+        }
+        
+        if (storer!=null) {
+            tblProps.put(HCatConstants.HCAT_PIG_STORER, storer);
+        }
 
         if (desc == null) {
             // Desc will be null if its CREATE TABLE LIKE. Desc will be

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java Tue Nov  1 21:14:50 2011
@@ -42,6 +42,8 @@ public enum ErrorType {
     ERROR_MISSING_PARTITION_KEY         (2011, "Partition key value not provided for publish"),
     ERROR_MOVE_FAILED                   (2012, "Moving of data failed during commit"),
     ERROR_TOO_MANY_DYNAMIC_PTNS         (2013, "Attempt to create too many dynamic partitions"),
+    ERROR_INIT_LOADER                   (2014,  "Error initializing Pig loader"),
+    ERROR_INIT_STORER                   (2015,  "Error initializing Pig storer"),
 
     /* Authorization Errors 3000 - 3999 */
     ERROR_ACCESS_CONTROL           (3000, "Permission denied"),

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java Tue Nov  1 21:14:50 2011
@@ -30,6 +30,13 @@ public final class HCatConstants {
   public static final String HCAT_RCFILE_ISD_CLASS = "org.apache.hcatalog.rcfile.RCFileInputDriver";
   public static final String HCAT_RCFILE_OSD_CLASS = "org.apache.hcatalog.rcfile.RCFileOutputDriver";
 
+  public static final String HCAT_PIG_LOADER = "hcat.pig.loader";
+  public static final String HCAT_PIG_LOADER_ARGS = "hcat.pig.loader.args";
+  public static final String HCAT_PIG_STORER = "hcat.pig.storer";
+  public static final String HCAT_PIG_STORER_ARGS = "hcat.pig.storer.args";
+  public static final String HCAT_PIG_ARGS_DELIMIT = "hcat.pig.args.delimiter";
+  public static final String HCAT_PIG_ARGS_DELIMIT_DEFAULT = ",";
+  
   //The keys used to store info into the job Configuration
   public static final String HCAT_KEY_BASE = "mapreduce.lib.hcat";
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java Tue Nov  1 21:14:50 2011
@@ -157,10 +157,9 @@ public abstract class HCatBaseOutputForm
           driver.setPartitionValues(jobContext, partitionValues);
           driver.setOutputPath(jobContext, location);
           
-//          HCatUtil.logMap(LOG,"Setting outputPath ["+location+"] for ",partitionValues);
-
           driver.initialize(jobContext, jobInfo.getTableInfo().getStorerInfo().getProperties());
-
+          
+//          HCatUtil.logMap(LOG,"Setting outputPath ["+location+"] for ",partitionValues);
           return driver;
       } catch(Exception e) {
         if (e instanceof HCatException){

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java Tue Nov  1 21:14:50 2011
@@ -52,7 +52,7 @@ public abstract class HCatOutputStorageD
      * @return the OutputFormat instance
      * @throws IOException Signals that an I/O exception has occurred.
      */
-    public abstract OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException;
+    public abstract OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat() throws IOException;
 
     /**
      * Set the data location for the output.

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java Tue Nov  1 21:14:50 2011
@@ -53,6 +53,10 @@ public class HCatLoader extends HCatBase
   private String hcatServerUri;
   private String partitionFilterString;
   private final PigHCatUtil phutil = new PigHCatUtil();
+  
+  // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize
+  final public static String INNER_SIGNATURE = "hcatloader.inner.signature";
+  final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature";
 
   @Override
   public InputFormat<?,?> getInputFormat() throws IOException {
@@ -70,6 +74,7 @@ public class HCatLoader extends HCatBase
 @Override
   public void setLocation(String location, Job job) throws IOException {
 
+    job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + signature);
     Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
     dbName = dbTablePair.first;
     tableName = dbTablePair.second;

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Tue Nov  1 21:14:50 2011
@@ -48,9 +48,10 @@ import org.apache.pig.impl.util.UDFConte
 
 public class HCatStorer extends HCatBaseStorer {
 
-  /**
-   *
-   */
+  // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize
+  final public static String INNER_SIGNATURE = "hcatstorer.inner.signature";
+  final public static String INNER_SIGNATURE_PREFIX = "hcatstorer_inner_signature";
+
 
   public HCatStorer(String partSpecs, String schema) throws Exception {
     super(partSpecs, schema);
@@ -72,6 +73,7 @@ public class HCatStorer extends HCatBase
   @Override
   public void setStoreLocation(String location, Job job) throws IOException {
 
+    job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
     Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
 
     String[] userStr = location.split("\\.");

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java Tue Nov  1 21:14:50 2011
@@ -18,6 +18,8 @@
 package org.apache.hcatalog.pig.drivers;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -28,12 +30,17 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
+import org.apache.hcatalog.pig.HCatLoader;
 import org.apache.hcatalog.pig.PigHCatUtil;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 
 
@@ -44,7 +51,7 @@ import org.apache.pig.data.Tuple;
  * and override the initialize(). {@link PigStorageInputDriver} illustrates
  * that well.
  */
-public abstract class LoadFuncBasedInputDriver extends HCatInputStorageDriver{
+public class LoadFuncBasedInputDriver extends HCatInputStorageDriver{
 
   private LoadFuncBasedInputFormat inputFormat;
   private HCatSchema dataSchema;
@@ -99,9 +106,51 @@ public abstract class LoadFuncBasedInput
 
   @Override
   public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
-
+    
+    String loaderString = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_LOADER);
+    if (loaderString==null) {
+        throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Don't know how to instantiate loader, " + HCatConstants.HCAT_PIG_LOADER + " property is not defined for table ");
+    }
+    String loaderArgs = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_LOADER_ARGS);
+    
+    String[] args;
+    if (loaderArgs!=null) {
+        String delimit = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_ARGS_DELIMIT);
+        if (delimit==null) {
+            delimit = HCatConstants.HCAT_PIG_ARGS_DELIMIT_DEFAULT;
+        }
+        args = loaderArgs.split(delimit);
+    } else {
+        args = new String[0];
+    }
+    
+    try {
+        Class loaderClass = Class.forName(loaderString);
+    
+        Constructor[] constructors = loaderClass.getConstructors();
+        for (Constructor constructor : constructors) {
+            if (constructor.getParameterTypes().length==args.length) {
+                lf = (LoadFunc)constructor.newInstance(args);
+                break;
+            }
+        }
+    } catch (Exception e) {
+        throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Cannot instantiate " + loaderString, e);
+    }
+    
+    if (lf==null) {
+        throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Cannot instantiate " + loaderString + " with construct args " + loaderArgs);
+    }
+ 
+    // Need to set the right signature in setLocation. The original signature is used by HCatLoader
+    // and it does use this signature to access UDFContext, so we need to invent a new signature for
+    // the wrapped loader.
+    // As for PigStorage/JsonStorage, set signature right before setLocation seems to be good enough,
+    // we may need to set signature more aggressively if we support more loaders
+    String innerSignature = context.getConfiguration().get(HCatLoader.INNER_SIGNATURE);
+    lf.setUDFContextSignature(innerSignature);
     lf.setLocation(location, new Job(context.getConfiguration()));
-    inputFormat = new LoadFuncBasedInputFormat(lf, PigHCatUtil.getResourceSchema(dataSchema));
+    inputFormat = new LoadFuncBasedInputFormat(lf, PigHCatUtil.getResourceSchema(dataSchema), location, context.getConfiguration());
   }
 
   private String location;

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java Tue Nov  1 21:14:50 2011
@@ -20,14 +20,17 @@ package org.apache.hcatalog.pig.drivers;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.builtin.PigStorage;
@@ -42,10 +45,15 @@ public class LoadFuncBasedInputFormat ex
   private final LoadFunc loadFunc;
   private static ResourceFieldSchema[] fields;
 
-  public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema) {
+  public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema, String location, Configuration conf) throws IOException {
 
     this.loadFunc = loadFunc;
     fields = dataSchema.getFields();
+    
+    // Simulate the frontend call sequence for LoadFunc, in case LoadFunc need to store something into UDFContext (as JsonLoader does)
+    if (loadFunc instanceof LoadMetadata) {
+        ((LoadMetadata)loadFunc).getSchema(location, new Job(conf));
+    }
   }
 
   @Override
@@ -59,7 +67,6 @@ public class LoadFuncBasedInputFormat ex
   @Override
   public List<InputSplit> getSplits(JobContext jobContext) throws IOException,
   InterruptedException {
-
     try {
       InputFormat<BytesWritable,Tuple> inpFormat = loadFunc.getInputFormat();
       return inpFormat.getSplits(jobContext);
@@ -103,51 +110,57 @@ public class LoadFuncBasedInputFormat ex
 
        for(int i = 0; i < tupleFromDisk.size(); i++) {
 
-         DataByteArray dba = (DataByteArray) tupleFromDisk.get(i);
-
-         if(dba == null) {
-           // PigStorage will insert nulls for empty fields.
-          tupleFromDisk.set(i, null);
-          continue;
-        }
-
-         switch(fields[i].getType()) {
-
-         case DataType.CHARARRAY:
-           tupleFromDisk.set(i, caster.bytesToCharArray(dba.get()));
-           break;
-
-         case DataType.INTEGER:
-           tupleFromDisk.set(i, caster.bytesToInteger(dba.get()));
-           break;
-
-         case DataType.FLOAT:
-           tupleFromDisk.set(i, caster.bytesToFloat(dba.get()));
-           break;
-
-         case DataType.LONG:
-           tupleFromDisk.set(i, caster.bytesToLong(dba.get()));
-           break;
-
-         case DataType.DOUBLE:
-           tupleFromDisk.set(i, caster.bytesToDouble(dba.get()));
-           break;
-
-         case DataType.MAP:
-           tupleFromDisk.set(i, caster.bytesToMap(dba.get()));
-           break;
-
-         case DataType.BAG:
-           tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i]));
-           break;
-
-         case DataType.TUPLE:
-           tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i]));
-           break;
-
-         default:
-           throw new IOException("Unknown Pig type in data: "+fields[i].getType());
-         }
+         Object data = tupleFromDisk.get(i);
+         
+         // We will do conversion for bytes only for now
+         if (data instanceof DataByteArray) {
+         
+             DataByteArray dba = (DataByteArray) data;
+    
+             if(dba == null) {
+               // PigStorage will insert nulls for empty fields.
+              tupleFromDisk.set(i, null);
+              continue;
+            }
+    
+             switch(fields[i].getType()) {
+    
+             case DataType.CHARARRAY:
+               tupleFromDisk.set(i, caster.bytesToCharArray(dba.get()));
+               break;
+    
+             case DataType.INTEGER:
+               tupleFromDisk.set(i, caster.bytesToInteger(dba.get()));
+               break;
+    
+             case DataType.FLOAT:
+               tupleFromDisk.set(i, caster.bytesToFloat(dba.get()));
+               break;
+    
+             case DataType.LONG:
+               tupleFromDisk.set(i, caster.bytesToLong(dba.get()));
+               break;
+    
+             case DataType.DOUBLE:
+               tupleFromDisk.set(i, caster.bytesToDouble(dba.get()));
+               break;
+    
+             case DataType.MAP:
+               tupleFromDisk.set(i, caster.bytesToMap(dba.get()));
+               break;
+    
+             case DataType.BAG:
+               tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i]));
+               break;
+    
+             case DataType.TUPLE:
+               tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i]));
+               break;
+    
+             default:
+               throw new IOException("Unknown Pig type in data: "+fields[i].getType());
+             }
+           }
        }
 
        return tupleFromDisk;

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java?rev=1196278&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java Tue Nov  1 21:14:50 2011
@@ -0,0 +1,130 @@
+package org.apache.hcatalog.pig.drivers;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.FileOutputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.pig.HCatLoader;
+import org.apache.hcatalog.pig.HCatStorer;
+import org.apache.hcatalog.pig.PigHCatUtil;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+public class StoreFuncBasedOutputDriver extends FileOutputStorageDriver {
+
+    protected StoreFuncInterface sf;
+    private TupleFactory factory = TupleFactory.getInstance();
+    private HCatSchema schema;
+    private String location;
+    
+    @Override
+    public void initialize(JobContext jobContext, Properties hcatProperties) throws IOException {
+        String storerString = hcatProperties.getProperty(HCatConstants.HCAT_PIG_STORER);
+        if (storerString==null) {
+            throw new HCatException(ErrorType.ERROR_INIT_STORER, "Don't know how to instantiate storer, " + HCatConstants.HCAT_PIG_STORER + " property is not defined for table ");
+        }
+        String storerArgs = hcatProperties.getProperty(HCatConstants.HCAT_PIG_STORER_ARGS);
+        
+        String[] args;
+        if (storerArgs!=null) {
+            String delimit = hcatProperties.getProperty(HCatConstants.HCAT_PIG_ARGS_DELIMIT);
+            if (delimit==null) {
+                delimit = HCatConstants.HCAT_PIG_ARGS_DELIMIT_DEFAULT;
+            }
+            args = storerArgs.split(delimit);
+        } else {
+            args = new String[0];
+        }
+        
+        try {
+            Class storerClass = Class.forName(storerString);
+        
+            Constructor[] constructors = storerClass.getConstructors();
+            for (Constructor constructor : constructors) {
+                if (constructor.getParameterTypes().length==args.length) {
+                    sf = (StoreFuncInterface)constructor.newInstance(args);
+                    break;
+                }
+            }
+        } catch (Exception e) {
+            throw new HCatException(ErrorType.ERROR_INIT_STORER, "Cannot instantiate " + storerString, e);
+        }
+        
+        if (sf==null) {
+            throw new HCatException(ErrorType.ERROR_INIT_STORER, "Cannot instantiate " + storerString + " with construct args " + storerArgs);
+        }
+     
+        super.initialize(jobContext, hcatProperties);
+        
+        Job job = new Job(jobContext.getConfiguration());
+        String innerSignature = jobContext.getConfiguration().get(HCatStorer.INNER_SIGNATURE);
+        
+        // Set signature before invoking StoreFunc methods, see comment in
+        // see comments in LoadFuncBasedInputDriver.initialize
+        sf.setStoreFuncUDFContextSignature(innerSignature);
+        sf.checkSchema(PigHCatUtil.getResourceSchema(schema));
+
+        sf.setStoreLocation(location, job);
+        ConfigurationUtil.mergeConf(jobContext.getConfiguration(), 
+                job.getConfiguration());
+    }
+    
+    @Override
+    public OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat()
+            throws IOException {
+        StoreFuncBasedOutputFormat outputFormat = new StoreFuncBasedOutputFormat(sf);
+        return outputFormat;
+    }
+
+    @Override
+    public void setOutputPath(JobContext jobContext, String location)
+            throws IOException {
+        this.location = location;
+    }
+
+    @Override
+    public void setSchema(JobContext jobContext, HCatSchema schema)
+            throws IOException {
+        this.schema = schema;
+    }
+
+    @Override
+    public void setPartitionValues(JobContext jobContext,
+            Map<String, String> partitionValues) throws IOException {
+        // Doing nothing, partition keys are not stored along with the data, so ignore it
+    }
+
+    @Override
+    public WritableComparable<?> generateKey(HCatRecord value)
+            throws IOException {
+        return null;
+    }
+
+    @Override
+    public Writable convertValue(HCatRecord value) throws IOException {
+        Tuple t = factory.newTupleNoCopy(value.getAll());
+        return t;
+    }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java?rev=1196278&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java Tue Nov  1 21:14:50 2011
@@ -0,0 +1,153 @@
+package org.apache.hcatalog.pig.drivers;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.pig.PigHCatUtil;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+public class StoreFuncBasedOutputFormat extends
+        OutputFormat<BytesWritable, Tuple> {
+
+    private final StoreFuncInterface storeFunc;
+    
+    public StoreFuncBasedOutputFormat(StoreFuncInterface storeFunc) {
+
+        this.storeFunc = storeFunc;
+    }
+    
+    @Override
+    public void checkOutputSpecs(JobContext jobContext) throws IOException,
+            InterruptedException {
+        OutputFormat<BytesWritable,Tuple> outputFormat =  storeFunc.getOutputFormat();
+        outputFormat.checkOutputSpecs(jobContext);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
+            throws IOException, InterruptedException {
+        String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+        OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(serializedJobInfo);
+        ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema());
+        String location = outputJobInfo.getLocation();
+        OutputFormat<BytesWritable,Tuple> outputFormat =  storeFunc.getOutputFormat();
+        return new StoreFuncBasedOutputCommitter(storeFunc, outputFormat.getOutputCommitter(ctx), location, rs);
+    }
+
+    @Override
+    public RecordWriter<BytesWritable, Tuple> getRecordWriter(
+            TaskAttemptContext ctx) throws IOException, InterruptedException {
+        RecordWriter<BytesWritable,Tuple> writer = storeFunc.getOutputFormat().getRecordWriter(ctx);
+        String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+        OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(serializedJobInfo);
+        ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema());
+        String location = outputJobInfo.getLocation();
+        return new StoreFuncBasedRecordWriter(writer, storeFunc, location, rs);
+    }
+    
+    static class StoreFuncBasedRecordWriter extends RecordWriter<BytesWritable, Tuple> {
+        private final RecordWriter<BytesWritable,Tuple> writer;
+        private final StoreFuncInterface storeFunc;
+        private final ResourceSchema schema;
+        private final String location;
+        
+        public StoreFuncBasedRecordWriter(RecordWriter<BytesWritable,Tuple> writer, StoreFuncInterface sf, String location, ResourceSchema rs) throws IOException {
+            this.writer = writer;
+            this.storeFunc = sf;
+            this.schema = rs;
+            this.location = location;
+            storeFunc.prepareToWrite(writer);
+        }
+        
+        @Override
+        public void close(TaskAttemptContext ctx) throws IOException,
+                InterruptedException {
+            writer.close(ctx);
+        }
+
+        @Override
+        public void write(BytesWritable key, Tuple value) throws IOException,
+                InterruptedException {
+            storeFunc.putNext(value);
+        }
+    }
+    
+    static class StoreFuncBasedOutputCommitter extends OutputCommitter {
+        StoreFuncInterface sf;
+        OutputCommitter wrappedOutputCommitter;
+        String location;
+        ResourceSchema rs;
+        public StoreFuncBasedOutputCommitter(StoreFuncInterface sf, OutputCommitter outputCommitter, String location, ResourceSchema rs) {
+            this.sf = sf;
+            this.wrappedOutputCommitter = outputCommitter;
+            this.location = location;
+            this.rs = rs;
+        }
+        @Override
+        public void abortTask(TaskAttemptContext context) throws IOException {
+            wrappedOutputCommitter.abortTask(context);
+        }
+
+        @Override
+        public void commitTask(TaskAttemptContext context) throws IOException {
+            wrappedOutputCommitter.commitTask(context);
+        }
+
+        @Override
+        public boolean needsTaskCommit(TaskAttemptContext context)
+                throws IOException {
+            return wrappedOutputCommitter.needsTaskCommit(context);
+        }
+
+        @Override
+        public void setupJob(JobContext context) throws IOException {
+            wrappedOutputCommitter.setupJob(context);
+        }
+
+        @Override
+        public void setupTask(TaskAttemptContext context) throws IOException {
+            wrappedOutputCommitter.setupTask(context);
+        }
+        
+        public void commitJob(JobContext context) throws IOException {
+            wrappedOutputCommitter.commitJob(context);
+            if (sf instanceof StoreMetadata) {
+                if (rs != null) {
+                    ((StoreMetadata) sf).storeSchema(
+                            rs, location, new Job(context.getConfiguration()) );
+                }
+            }
+        }
+        
+        @Override
+        public void cleanupJob(JobContext context) throws IOException {
+            wrappedOutputCommitter.cleanupJob(context);
+            if (sf instanceof StoreMetadata) {
+                if (rs != null) {
+                    ((StoreMetadata) sf).storeSchema(
+                            rs, location, new Job(context.getConfiguration()) );
+                }
+            }
+        }
+        
+        public void abortJob(JobContext context, JobStatus.State state) throws IOException {
+            wrappedOutputCommitter.abortJob(context, state);
+        }
+    }
+}

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java Tue Nov  1 21:14:50 2011
@@ -95,7 +95,7 @@ import org.apache.hcatalog.mapreduce.Fil
    */
   @SuppressWarnings("unchecked")
   @Override
-  public OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException {
+  public OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat() throws IOException {
     if( outputFormat == null ) {
       outputFormat = new RCFileMapReduceOutputFormat();
     }

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java Tue Nov  1 21:14:50 2011
@@ -348,9 +348,7 @@ public class TestSemanticAnalysis extend
     query =  "create table junit_sem_analysis (a int) partitioned by (b string)  stored as TEXTFILE";
 
     CommandProcessorResponse response = hcatDriver.run(query);
-    assertEquals(10,response.getResponseCode());
-    assertEquals("FAILED: Error in semantic analysis: Operation not supported. HCatalog doesn't support Text File by default yet. You may specify it through INPUT/OUTPUT storage drivers.",
-        response.getErrorMessage());
+    assertEquals(0,response.getResponseCode());
 
   }
 

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorage.java?rev=1196278&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorage.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorage.java Tue Nov  1 21:14:50 2011
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hcatalog.pig.drivers.PigStorageInputDriver;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+
+public class MyPigStorage extends PigStorage {
+
+  String arg2;
+  public MyPigStorage(String arg1, String arg2) throws IOException {
+    super(arg1);
+    this.arg2 = arg2;
+  }
+  
+  @Override
+  public void putNext(Tuple t) throws IOException {
+      t.append(arg2);
+      super.putNext(t);
+  }
+}

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MyPigStorageDriver.java Tue Nov  1 21:14:50 2011
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hcatalog.pig;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hcatalog.pig.drivers.PigStorageInputDriver;
-
-public class MyPigStorageDriver extends PigStorageInputDriver{
-
-  @Override
-  public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
-    if ( !"control-A".equals(storageDriverArgs.getProperty(PigStorageInputDriver.delim))){
-      /* This is the only way to make testcase fail. Throwing exception from
-       * here doesn't propagate up.
-       */
-      System.exit(1);
-    }
-    super.initialize(context, storageDriverArgs);
-  }
-}

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java Tue Nov  1 21:14:50 2011
@@ -21,7 +21,9 @@ import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -45,8 +47,8 @@ import org.apache.hadoop.hive.ql.io.RCFi
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.pig.HCatLoader;
-import org.apache.hcatalog.pig.drivers.PigStorageInputDriver;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
@@ -59,6 +61,9 @@ public class TestPigStorageDriver extend
   private HiveConf hcatConf;
   private Driver hcatDriver;
   private HiveMetaStoreClient msc;
+  private static String tblLocation = "/tmp/test_pig/data";
+  private static String anyExistingFileInCurDir = "ivy.xml";
+  private static String warehouseDir = "/tmp/hcat_junit_warehouse";
 
   @Override
   protected void setUp() throws Exception {
@@ -81,57 +86,91 @@ public class TestPigStorageDriver extend
 
   public void testPigStorageDriver() throws IOException, CommandNeedRetryException{
 
-
     String fsLoc = hcatConf.get("fs.default.name");
-    Path tblPath = new Path(fsLoc, "/tmp/test_pig/data");
-    String anyExistingFileInCurDir = "ivy.xml";
+    Path tblPath = new Path(fsLoc, tblLocation);
+    String tblName = "junit_pigstorage";
     tblPath.getFileSystem(hcatConf).copyFromLocalFile(new Path(anyExistingFileInCurDir),tblPath);
 
-    hcatDriver.run("drop table junit_pigstorage");
+    hcatDriver.run("drop table " + tblName);
     CommandProcessorResponse resp;
-    String createTable = "create table junit_pigstorage (a string) partitioned by (b string) stored as RCFILE";
+    String createTable = "create table " + tblName + " (a string) partitioned by (b string) stored as TEXTFILE";
 
     resp = hcatDriver.run(createTable);
     assertEquals(0, resp.getResponseCode());
     assertNull(resp.getErrorMessage());
 
-    resp = hcatDriver.run("alter table junit_pigstorage add partition (b='2010-10-10') location '"+new Path(fsLoc, "/tmp/test_pig")+"'");
+    resp = hcatDriver.run("alter table " + tblName + " add partition (b='2010-10-10') location '"+new Path(fsLoc, "/tmp/test_pig")+"'");
     assertEquals(0, resp.getResponseCode());
     assertNull(resp.getErrorMessage());
 
-    resp = hcatDriver.run("alter table junit_pigstorage partition (b='2010-10-10') set fileformat inputformat '" + RCFileInputFormat.class.getName()
-        +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+PigStorageInputDriver.class.getName()+"' outputdriver 'non-existent'");
+    resp = hcatDriver.run("alter table " + tblName + " partition (b='2010-10-10') set fileformat TEXTFILE");
     assertEquals(0, resp.getResponseCode());
     assertNull(resp.getErrorMessage());
 
-    resp =  hcatDriver.run("desc extended junit_pigstorage partition (b='2010-10-10')");
+    resp =  hcatDriver.run("desc extended " + tblName + " partition (b='2010-10-10')");
     assertEquals(0, resp.getResponseCode());
     assertNull(resp.getErrorMessage());
 
     PigServer server = new PigServer(ExecType.LOCAL, hcatConf.getAllProperties());
     UDFContext.getUDFContext().setClientSystemProps();
-    server.registerQuery(" a = load 'junit_pigstorage' using "+HCatLoader.class.getName()+";");
+    server.registerQuery(" a = load '" + tblName + "' using "+HCatLoader.class.getName()+";");
     Iterator<Tuple> itr = server.openIterator("a");
-    DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(new File(anyExistingFileInCurDir))));
-    while(itr.hasNext()){
-      Tuple t = itr.next();
-      assertEquals(2, t.size());
-      if(t.get(0) != null) {
-        // If underlying data-field is empty. PigStorage inserts null instead
-        // of empty String objects.
-        assertTrue(t.get(0) instanceof String);
-        assertEquals(stream.readLine(), t.get(0));
-      }
-      else{
-        assertTrue(stream.readLine().isEmpty());
+    boolean result = compareWithFile(itr, anyExistingFileInCurDir, 2, "2010-10-10", null);
+    assertTrue(result);
+    
+    server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);");
+    server.store("a", tblName, HCatStorer.class.getName() + "('b=2010-10-11')");
+    
+    server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-10-11' using PigStorage() as (a:chararray);");
+    itr = server.openIterator("a");
+    result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-10-11", null);
+    assertTrue(result);
+    
+    // Test multi-store
+    server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);");
+    server.registerQuery("store a into '" + tblName + "' using " +  HCatStorer.class.getName() + "('b=2010-11-01');");
+    server.registerQuery("store a into '" + tblName + "' using " +  HCatStorer.class.getName() + "('b=2010-11-02');");
+    
+    server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-11-01' using PigStorage() as (a:chararray);");
+    itr = server.openIterator("a");
+    result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-11-01", null);
+    assertTrue(result);
+    
+    server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-11-02' using PigStorage() as (a:chararray);");
+    itr = server.openIterator("a");
+    result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-11-02", null);
+    assertTrue(result);
+    
+    hcatDriver.run("drop table " + tblName);
+  }
+  
+  private boolean compareWithFile(Iterator<Tuple> itr, String factFile, int numColumn, String key, String valueSuffix) throws IOException {
+      DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(new File(factFile))));
+      while(itr.hasNext()){
+        Tuple t = itr.next();
+        assertEquals(numColumn, t.size());
+        if(t.get(0) != null) {
+          // If underlying data-field is empty. PigStorage inserts null instead
+          // of empty String objects.
+          assertTrue(t.get(0) instanceof String);
+          String expected = stream.readLine();
+          if (valueSuffix!=null)
+              expected += valueSuffix;
+          assertEquals(expected, t.get(0));
+        }
+        else{
+          assertTrue(stream.readLine().isEmpty());
+        }
+        
+        if (numColumn>1) {
+            // The second column must be key
+            assertTrue(t.get(1) instanceof String);
+            assertEquals(key, t.get(1));
+        }
       }
-      assertTrue(t.get(1) instanceof String);
-
-      assertEquals("2010-10-10", t.get(1));
-    }
-    assertEquals(0,stream.available());
-    stream.close();
-    hcatDriver.run("drop table junit_pigstorage");
+      assertEquals(0,stream.available());
+      stream.close();
+      return true;
   }
 
   public void testDelim() throws MetaException, TException, UnknownTableException, NoSuchObjectException, InvalidOperationException, IOException, CommandNeedRetryException{
@@ -139,7 +178,7 @@ public class TestPigStorageDriver extend
     hcatDriver.run("drop table junit_pigstorage_delim");
 
     CommandProcessorResponse resp;
-    String createTable = "create table junit_pigstorage_delim (a string) partitioned by (b string) stored as RCFILE";
+    String createTable = "create table junit_pigstorage_delim (a0 string, a1 string) partitioned by (b string) stored as RCFILE";
 
     resp = hcatDriver.run(createTable);
 
@@ -150,12 +189,12 @@ public class TestPigStorageDriver extend
     assertEquals(0, resp.getResponseCode());
     assertNull(resp.getErrorMessage());
 
-    resp = hcatDriver.run("alter table junit_pigstorage_delim partition (b='2010-10-10') set fileformat inputformat '" + RCFileInputFormat.class.getName()
-        +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+MyPigStorageDriver.class.getName()+"' outputdriver 'non-existent'");
+    resp = hcatDriver.run("alter table junit_pigstorage_delim partition (b='2010-10-10') set fileformat TEXTFILE");
 
     Partition part = msc.getPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", "b=2010-10-10");
     Map<String,String> partParms = part.getParameters();
-    partParms.put(PigStorageInputDriver.delim, "control-A");
+    partParms.put(HCatConstants.HCAT_PIG_LOADER_ARGS, "control-A");
+    partParms.put(HCatConstants.HCAT_PIG_STORER_ARGS, "control-A");
 
     msc.alter_partition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", part);
 
@@ -165,5 +204,67 @@ public class TestPigStorageDriver extend
     try{
       server.openIterator("a");
     }catch(FrontendException fe){}
+    
+    resp = hcatDriver.run("alter table junit_pigstorage_delim set fileformat TEXTFILE");
+    assertEquals(0, resp.getResponseCode());
+    assertNull(resp.getErrorMessage());
+    resp = hcatDriver.run("alter table junit_pigstorage_delim set TBLPROPERTIES ('hcat.pig.loader.args'=':', 'hcat.pig.storer.args'=':')");
+    assertEquals(0, resp.getResponseCode());
+    assertNull(resp.getErrorMessage());
+    
+    File inputFile = File.createTempFile("hcat_test", "");
+    PrintWriter p = new PrintWriter(new FileWriter(inputFile));
+    p.println("1\t2");
+    p.println("3\t4");
+    p.close();
+    server.registerQuery("a = load '"+inputFile.toString()+"' using PigStorage() as (a0:chararray, a1:chararray);");
+    server.store("a", "junit_pigstorage_delim", HCatStorer.class.getName() + "('b=2010-10-11')");
+    
+    server.registerQuery("a = load '/tmp/hcat_junit_warehouse/junit_pigstorage_delim/b=2010-10-11' using PigStorage() as (a:chararray);");
+    Iterator<Tuple> itr = server.openIterator("a");
+    
+    assertTrue(itr.hasNext());
+    Tuple t = itr.next();
+    assertTrue(t.get(0).equals("1:2"));
+    
+    assertTrue(itr.hasNext());
+    t = itr.next();
+    assertTrue(t.get(0).equals("3:4"));
+    
+    assertFalse(itr.hasNext());
+    inputFile.delete();
   }
+  
+  public void testMultiConstructArgs() throws MetaException, TException, UnknownTableException, NoSuchObjectException, InvalidOperationException, IOException, CommandNeedRetryException{
+
+      String fsLoc = hcatConf.get("fs.default.name");
+      Path tblPath = new Path(fsLoc, tblLocation);
+      String tblName = "junit_pigstorage_constructs";
+      tblPath.getFileSystem(hcatConf).copyFromLocalFile(new Path(anyExistingFileInCurDir),tblPath);
+
+      hcatDriver.run("drop table junit_pigstorage_constructs");
+
+      CommandProcessorResponse resp;
+      String createTable = "create table " + tblName + " (a string) partitioned by (b string) stored as TEXTFILE";
+
+      resp = hcatDriver.run(createTable);
+
+      assertEquals(0, resp.getResponseCode());
+      assertNull(resp.getErrorMessage());
+
+      resp = hcatDriver.run("alter table " + tblName + " set TBLPROPERTIES ('hcat.pig.storer'='org.apache.hcatalog.pig.MyPigStorage', 'hcat.pig.storer.args'=':#hello', 'hcat.pig.args.delimiter'='#')");
+      assertEquals(0, resp.getResponseCode());
+      assertNull(resp.getErrorMessage());
+      
+      PigServer server = new PigServer(ExecType.LOCAL, hcatConf.getAllProperties());
+      UDFContext.getUDFContext().setClientSystemProps();
+      
+      server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);");
+      server.store("a", tblName, HCatStorer.class.getName() + "('b=2010-10-11')");
+      
+      server.registerQuery("a = load '" + warehouseDir + "/" + tblName + "/b=2010-10-11' using PigStorage() as (a:chararray);");
+      Iterator<Tuple> itr = server.openIterator("a");
+      boolean result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-10-11", ":hello");
+      assertTrue(result);
+    }
 }

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java Tue Nov  1 21:14:50 2011
@@ -48,7 +48,7 @@ public class HBaseBulkOutputStorageDrive
     }
 
     @Override
-    public OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException {
+    public OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat() throws IOException {
         return outputFormat;
     }
 

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java Tue Nov  1 21:14:50 2011
@@ -41,7 +41,7 @@ public class HBaseDirectOutputStorageDri
     }
 
     @Override
-    public OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException {
+    public OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat() throws IOException {
         return outputFormat;
     }
 

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java?rev=1196278&r1=1196277&r2=1196278&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java Tue Nov  1 21:14:50 2011
@@ -60,7 +60,7 @@ public class HBaseOutputStorageDriver ex
     }
 
     @Override
-    public OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException {
+    public OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat() throws IOException {
         return activeOSD.getOutputFormat();
     }