You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ha...@apache.org on 2011/08/01 17:17:06 UTC

svn commit: r1152865 [1/2] - in /incubator/hcatalog/trunk/src: java/org/apache/hcatalog/common/ java/org/apache/hcatalog/mapreduce/ java/org/apache/hcatalog/pig/ java/org/apache/hcatalog/rcfile/ test/org/apache/hcatalog/listener/ test/org/apache/hcatal...

Author: hashutosh
Date: Mon Aug  1 17:17:04 2011
New Revision: 1152865

URL: http://svn.apache.org/viewvc?rev=1152865&view=rev
Log:
Reverting HCatalog-64

Removed:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
Modified:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java Mon Aug  1 17:17:04 2011
@@ -18,8 +18,24 @@
 
 package org.apache.hcatalog.common;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -31,9 +47,8 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
@@ -41,12 +56,10 @@ import org.apache.hcatalog.data.schema.H
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.thrift.TException;
 
-import java.io.*;
-import java.util.*;
-import java.util.Map.Entry;
-
 public class HCatUtil {
 
 //  static final private Log LOG = LogFactory.getLog(HCatUtil.class);
@@ -170,22 +183,6 @@ public class HCatUtil {
     }
 
   /**
-   * return the partition columns from a table instance
-   * @param table the instance to extract partition columns from
-   * @return HCatSchema instance which contains the partition columns
-   * @throws IOException
-   */
-  public static HCatSchema getPartitionColumns(Table table) throws IOException{
-      HCatSchema cols = new HCatSchema(new LinkedList<HCatFieldSchema>());
-      if( table.getPartitionKeys().size() != 0 ) {
-        for (FieldSchema fs : table.getPartitionKeys()){
-            cols.append(HCatSchemaUtils.getHCatFieldSchema(fs));
-        }
-      }
-      return cols;
-    }
-
-  /**
    * Validate partition schema, checks if the column types match between the partition
    * and the existing table schema. Returns the list of columns present in the partition
    * but not in the table.

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java Mon Aug  1 17:17:04 2011
@@ -18,19 +18,22 @@
 
 package org.apache.hcatalog.mapreduce;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
 public abstract class HCatBaseInputFormat extends InputFormat<WritableComparable, HCatRecord> {
   
   /**
@@ -72,15 +75,15 @@ public abstract class HCatBaseInputForma
 
     //Get the job info from the configuration,
     //throws exception if not initialized
-    InputJobInfo inputJobInfo;
+    JobInfo jobInfo;
     try {
-      inputJobInfo = getJobInfo(jobContext);
+      jobInfo = getJobInfo(jobContext);
     } catch (Exception e) {
       throw new IOException(e);
     }
 
     List<InputSplit> splits = new ArrayList<InputSplit>();
-    List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
+    List<PartInfo> partitionInfoList = jobInfo.getPartitions();
     if(partitionInfoList == null ) {
       //No partitions match the specified partition filter
       return splits;
@@ -96,14 +99,8 @@ public abstract class HCatBaseInputForma
         throw new IOException(e);
       }
 
-      HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
-      for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields())
-          allCols.append(field);
-      for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields())
-          allCols.append(field);
-
       //Pass all required information to the storage driver
-      initStorageDriver(storageDriver, localJob, partitionInfo, allCols);
+      initStorageDriver(storageDriver, localJob, partitionInfo, jobInfo.getTableSchema());
 
       //Get the input format for the storage driver
       InputFormat inputFormat =
@@ -117,7 +114,7 @@ public abstract class HCatBaseInputForma
         splits.add(new HCatSplit(
             partitionInfo,
             split,
-            allCols));
+            jobInfo.getTableSchema()));
       }
     }
 
@@ -142,10 +139,10 @@ public abstract class HCatBaseInputForma
     HCatSplit hcatSplit = (HCatSplit) split;
     PartInfo partitionInfo = hcatSplit.getPartitionInfo();
 
-    //If running through a Pig job, the InputJobInfo will not be available in the
+    //If running through a Pig job, the JobInfo will not be available in the
     //backend process context (since HCatLoader works on a copy of the JobContext and does
     //not call HCatInputFormat.setInput in the backend process).
-    //So this function should NOT attempt to read the InputJobInfo.
+    //So this function should NOT attempt to read the JobInfo.
 
     HCatInputStorageDriver storageDriver;
     try {
@@ -177,30 +174,25 @@ public abstract class HCatBaseInputForma
    * @throws Exception if HCatInputFromat.setInput has not been called for the current context
    */
   public static HCatSchema getTableSchema(JobContext context) throws Exception {
-    InputJobInfo inputJobInfo = getJobInfo(context);
-      HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
-      for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields())
-          allCols.append(field);
-      for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields())
-          allCols.append(field);
-    return allCols;
+    JobInfo jobInfo = getJobInfo(context);
+    return jobInfo.getTableSchema();
   }
 
   /**
-   * Gets the InputJobInfo object by reading the Configuration and deserializing
-   * the string. If InputJobInfo is not present in the configuration, throws an
+   * Gets the JobInfo object by reading the Configuration and deserializing
+   * the string. If JobInfo is not present in the configuration, throws an
    * exception since that means HCatInputFormat.setInput has not been called.
    * @param jobContext the job context
-   * @return the InputJobInfo object
+   * @return the JobInfo object
    * @throws Exception the exception
    */
-  private static InputJobInfo getJobInfo(JobContext jobContext) throws Exception {
+  private static JobInfo getJobInfo(JobContext jobContext) throws Exception {
     String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
     if( jobString == null ) {
       throw new Exception("job information not found in JobContext. HCatInputFormat.setInput() not called?");
     }
 
-    return (InputJobInfo) HCatUtil.deserialize(jobString);
+    return (JobInfo) HCatUtil.deserialize(jobString);
   }
 
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java Mon Aug  1 17:17:04 2011
@@ -18,6 +18,14 @@
 
 package org.apache.hcatalog.mapreduce;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.io.Writable;
@@ -31,11 +39,6 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatSchema;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 public abstract class HCatBaseOutputFormat extends OutputFormat<WritableComparable<?>, HCatRecord> {
 
 //  static final private Log LOG = LogFactory.getLog(HCatBaseOutputFormat.class);
@@ -49,7 +52,7 @@ public abstract class HCatBaseOutputForm
    */
   public static HCatSchema getTableSchema(JobContext context) throws IOException {
       OutputJobInfo jobInfo = getJobInfo(context);
-      return jobInfo.getTableInfo().getDataColumns();
+      return jobInfo.getTableSchema();
   }
 
   /**
@@ -81,7 +84,7 @@ public abstract class HCatBaseOutputForm
 
   /**
    * Gets the HCatOuputJobInfo object by reading the Configuration and deserializing
-   * the string. If InputJobInfo is not present in the configuration, throws an
+   * the string. If JobInfo is not present in the configuration, throws an
    * exception since that means HCatOutputFormat.setOutput has not been called.
    * @param jobContext the job context
    * @return the OutputJobInfo object
@@ -122,15 +125,15 @@ public abstract class HCatBaseOutputForm
       try {
           Class<? extends HCatOutputStorageDriver> driverClass =
               (Class<? extends HCatOutputStorageDriver>)
-              Class.forName(jobInfo.getTableInfo().getStorerInfo().getOutputSDClass());
+              Class.forName(jobInfo.getStorerInfo().getOutputSDClass());
           HCatOutputStorageDriver driver = driverClass.newInstance();
 
-          Map<String, String> partitionValues = jobInfo.getPartitionValues();
+          Map<String, String> partitionValues = jobInfo.getTableInfo().getPartitionValues();
           String location = jobInfo.getLocation();
 
           if (dynamicPartVals != null){
             // dynamic part vals specified
-            List<String> dynamicPartKeys = jobInfo.getDynamicPartitioningKeys();
+            List<String> dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys();
             if (dynamicPartVals.size() != dynamicPartKeys.size()){
               throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, 
                   "Unable to instantiate dynamic partitioning storage driver, mismatch between"
@@ -142,7 +145,7 @@ public abstract class HCatBaseOutputForm
             }
 
             // re-home location, now that we know the rest of the partvals
-            Table table = jobInfo.getTableInfo().getTable();
+            Table table = jobInfo.getTable();
             
             List<String> partitionCols = new ArrayList<String>();
             for(FieldSchema schema : table.getPartitionKeys()) {
@@ -161,7 +164,7 @@ public abstract class HCatBaseOutputForm
           
 //          HCatUtil.logMap(LOG,"Setting outputPath ["+location+"] for ",partitionValues);
 
-          driver.initialize(jobContext, jobInfo.getTableInfo().getStorerInfo().getProperties());
+          driver.initialize(jobContext, jobInfo.getStorerInfo().getProperties());
 
           return driver;
       } catch(Exception e) {
@@ -176,7 +179,7 @@ public abstract class HCatBaseOutputForm
   /**
    * Gets the output storage driver instance, with allowing specification 
    * of partvals from which it picks the dynamic partvals
-   * @param context the job context
+   * @param jobContext the job context
    * @param jobInfo the output job info
    * @return the output driver instance
    * @throws IOException
@@ -185,7 +188,7 @@ public abstract class HCatBaseOutputForm
   protected static HCatOutputStorageDriver getOutputDriverInstance(
       JobContext context, OutputJobInfo jobInfo,
       Map<String, String> fullPartSpec) throws IOException {
-    List<String> dynamicPartKeys = jobInfo.getDynamicPartitioningKeys();
+    List<String> dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys();
     if ((dynamicPartKeys == null)||(dynamicPartKeys.isEmpty())){
       return getOutputDriverInstance(context,jobInfo,(List<String>)null);
     }else{
@@ -224,8 +227,8 @@ public abstract class HCatBaseOutputForm
     // These would be partition keys too, so would also need to be removed from 
     // output schema and partcols
 
-    if (jobInfo.isDynamicPartitioningUsed()){
-      for (String partKey : jobInfo.getDynamicPartitioningKeys()){
+    if (jobInfo.getTableInfo().isDynamicPartitioningUsed()){
+      for (String partKey : jobInfo.getTableInfo().getDynamicPartitioningKeys()){
         Integer idx;
         if((idx = schema.getPosition(partKey)) != null){
           posOfPartCols.add(idx);
@@ -235,7 +238,7 @@ public abstract class HCatBaseOutputForm
       }
     }
     
-    HCatUtil.validatePartitionSchema(jobInfo.getTableInfo().getTable(), schemaWithoutParts);
+    HCatUtil.validatePartitionSchema(jobInfo.getTable(), schemaWithoutParts);
     jobInfo.setPosOfPartCols(posOfPartCols);
     jobInfo.setPosOfDynPartCols(posOfDynPartCols);
     jobInfo.setOutputSchema(schemaWithoutParts);

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java Mon Aug  1 17:17:04 2011
@@ -18,6 +18,15 @@
 
 package org.apache.hcatalog.mapreduce;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -30,11 +39,6 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-
 /** The InputFormat to use to read data from HCat */
 public class HCatEximInputFormat extends HCatBaseInputFormat {
 
@@ -64,7 +68,8 @@ public class HCatEximInputFormat extends
       Map.Entry<org.apache.hadoop.hive.metastore.api.Table, List<Partition>> tp = EximUtil
       .readMetaData(fs, metadataPath);
       org.apache.hadoop.hive.metastore.api.Table table = tp.getKey();
-      InputJobInfo inputInfo = InputJobInfo.create(table.getDbName(), table.getTableName(),null,null,null);
+      HCatTableInfo inputInfo = HCatTableInfo.getInputTableInfo(null,
+          null, table.getDbName(), table.getTableName());
       List<FieldSchema> partCols = table.getPartitionKeys();
       List<PartInfo> partInfoList = null;
       if (partCols.size() > 0) {
@@ -93,11 +98,11 @@ public class HCatEximInputFormat extends
         PartInfo partInfo = new PartInfo(schema, inputStorageDriverClass,  location + "/data", hcatProperties);
         partInfoList.add(partInfo);
       }
-      inputInfo.setPartitions(partInfoList);
-      inputInfo.setTableInfo(HCatTableInfo.valueOf(table));
+      JobInfo hcatJobInfo = new JobInfo(inputInfo,
+          HCatUtil.getTableSchemaWithPtnCols(table), partInfoList);
       job.getConfiguration().set(
           HCatConstants.HCAT_KEY_JOB_INFO,
-          HCatUtil.serialize(inputInfo));
+          HCatUtil.serialize(hcatJobInfo));
       List<HCatSchema> rv = new ArrayList<HCatSchema>(2);
       rv.add(HCatSchemaUtils.getHCatSchema(table.getSd().getCols()));
       rv.add(HCatSchemaUtils.getHCatSchema(partCols));

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java Mon Aug  1 17:17:04 2011
@@ -18,6 +18,13 @@
 
 package org.apache.hcatalog.mapreduce;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,16 +40,10 @@ import org.apache.hadoop.hive.ql.parse.E
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatException;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 public class HCatEximOutputCommitter extends HCatBaseOutputCommitter {
 
   private static final Log LOG = LogFactory.getLog(HCatEximOutputCommitter.class);
@@ -64,7 +65,7 @@ public class HCatEximOutputCommitter ext
     Configuration conf = jobContext.getConfiguration();
     FileSystem fs;
     try {
-      fs = FileSystem.get(new URI(jobInfo.getTableInfo().getTable().getSd().getLocation()), conf);
+      fs = FileSystem.get(new URI(jobInfo.getTable().getSd().getLocation()), conf);
     } catch (URISyntaxException e) {
       throw new IOException(e);
     }
@@ -74,7 +75,7 @@ public class HCatEximOutputCommitter ext
   private static void doCleanup(OutputJobInfo jobInfo, FileSystem fs) throws IOException,
       HCatException {
     try {
-      Table ttable = jobInfo.getTableInfo().getTable();
+      Table ttable = jobInfo.getTable();
       org.apache.hadoop.hive.ql.metadata.Table table = new org.apache.hadoop.hive.ql.metadata.Table(
           ttable);
       StorageDescriptor tblSD = ttable.getSd();
@@ -95,7 +96,7 @@ public class HCatEximOutputCommitter ext
         }
       }
       if (!table.getPartitionKeys().isEmpty()) {
-        Map<String, String> partitionValues = jobInfo.getPartitionValues();
+        Map<String, String> partitionValues = jobInfo.getTableInfo().getPartitionValues();
         org.apache.hadoop.hive.ql.metadata.Partition partition =
             new org.apache.hadoop.hive.ql.metadata.Partition(table,
                 partitionValues,

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java Mon Aug  1 17:17:04 2011
@@ -18,6 +18,12 @@
 
 package org.apache.hcatalog.mapreduce;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -31,7 +37,11 @@ import org.apache.hadoop.hive.ql.io.RCFi
 import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
@@ -43,12 +53,6 @@ import org.apache.hcatalog.data.schema.H
 import org.apache.hcatalog.rcfile.RCFileInputDriver;
 import org.apache.hcatalog.rcfile.RCFileOutputDriver;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.TreeMap;
-
 /**
  * The OutputFormat to use to write data to HCat without a hcat server. This can then
  * be imported into a hcat instance, or used with a HCatEximInputFormat. As in
@@ -124,7 +128,8 @@ public class HCatEximOutputFormat extend
       }
     }
     StorerInfo storerInfo = new StorerInfo(isdname, osdname, new Properties());
-    OutputJobInfo outputJobInfo = OutputJobInfo.create(dbname,tablename,partSpec,null,null);
+    HCatTableInfo outputInfo = HCatTableInfo.getOutputTableInfo(null, null, dbname, tablename,
+        partSpec);
     org.apache.hadoop.hive.ql.metadata.Table tbl = new
       org.apache.hadoop.hive.ql.metadata.Table(dbname, tablename);
     Table table = tbl.getTTable();
@@ -141,17 +146,16 @@ public class HCatEximOutputFormat extend
       StorageDescriptor sd = table.getSd();
       sd.setLocation(location);
       String dataLocation = location + "/" + partname;
-      outputJobInfo.setTableInfo(new HCatTableInfo(dbname,tablename,columnSchema,null,storerInfo,table));
-      outputJobInfo.setOutputSchema(columnSchema);
-      outputJobInfo.setLocation(dataLocation);
-      setPartDetails(outputJobInfo, columnSchema, partSpec);
-      sd.setCols(HCatUtil.getFieldSchemaList(outputJobInfo.getOutputSchema().getFields()));
+      OutputJobInfo jobInfo = new OutputJobInfo(outputInfo,
+          columnSchema, columnSchema, storerInfo, dataLocation, table);
+      setPartDetails(jobInfo, columnSchema, partSpec);
+      sd.setCols(HCatUtil.getFieldSchemaList(jobInfo.getOutputSchema().getFields()));
       sd.setInputFormat(ifname);
       sd.setOutputFormat(ofname);
       SerDeInfo serdeInfo = sd.getSerdeInfo();
       serdeInfo.setSerializationLib(serializationLib);
       Configuration conf = job.getConfiguration();
-      conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+      conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
     } catch (IOException e) {
       throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e);
     } catch (MetaException e) {

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java Mon Aug  1 17:17:04 2011
@@ -18,10 +18,10 @@
 
 package org.apache.hcatalog.mapreduce;
 
-import org.apache.hadoop.mapreduce.Job;
-
 import java.io.IOException;
 
+import org.apache.hadoop.mapreduce.Job;
+
 /** The InputFormat to use to read data from HCat */
 public class HCatInputFormat extends HCatBaseInputFormat {
 
@@ -31,13 +31,13 @@ public class HCatInputFormat extends HCa
    * the information in the conf object. The inputInfo object is updated with
    * information needed in the client context
    * @param job the job object
-   * @param inputJobInfo the input info for table to read
+   * @param inputInfo the table input info
    * @throws IOException the exception in communicating with the metadata server
    */
   public static void setInput(Job job,
-      InputJobInfo inputJobInfo) throws IOException {
+      HCatTableInfo inputInfo) throws IOException {
     try {
-      InitializeInput.setInput(job, inputJobInfo);
+      InitializeInput.setInput(job, inputInfo);
     } catch (Exception e) {
       throw new IOException(e);
     }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java Mon Aug  1 17:17:04 2011
@@ -17,6 +17,17 @@
  */
 package org.apache.hcatalog.mapreduce;
 
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -26,11 +37,18 @@ import org.apache.hadoop.hive.common.Fil
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.Constants;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hcatalog.common.ErrorType;
@@ -43,11 +61,6 @@ import org.apache.hcatalog.data.schema.H
 import org.apache.hcatalog.har.HarOutputCommitterPostProcessor;
 import org.apache.thrift.TException;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
-
 public class HCatOutputCommitter extends OutputCommitter {
 
 //    static final private Log LOG = LogFactory.getLog(HCatOutputCommitter.class);
@@ -67,7 +80,7 @@ public class HCatOutputCommitter extends
 
     public HCatOutputCommitter(JobContext context, OutputCommitter baseCommitter) throws IOException {
       OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
-      dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed();
+      dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed();
       if (!dynamicPartitioningUsed){
         this.baseCommitter = baseCommitter;
         this.partitionsDiscovered = true;
@@ -148,7 +161,7 @@ public class HCatOutputCommitter extends
 
       try {
         HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(
-            jobInfo.getServerUri(), jobContext.getConfiguration());
+            jobInfo.getTableInfo().getServerUri(), jobContext.getConfiguration());
         // cancel the deleg. tokens that were acquired for this job now that
         // we are done - we should cancel if the tokens were acquired by
         // HCatOutputFormat and not if they were supplied by Oozie. In the latter
@@ -176,7 +189,7 @@ public class HCatOutputCommitter extends
       Path src; 
       if (dynamicPartitioningUsed){
         src = new Path(getPartitionRootLocation(
-            jobInfo.getLocation().toString(),jobInfo.getTableInfo().getTable().getPartitionKeysSize()
+            jobInfo.getLocation().toString(),jobInfo.getTable().getPartitionKeysSize()
             ));
       }else{
         src = new Path(jobInfo.getLocation());
@@ -231,7 +244,7 @@ public class HCatOutputCommitter extends
 
       OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
       Configuration conf = context.getConfiguration();
-      Table table = jobInfo.getTableInfo().getTable();
+      Table table = jobInfo.getTable();
       Path tblPath = new Path(table.getSd().getLocation());
       FileSystem fs = tblPath.getFileSystem(conf);
 
@@ -270,7 +283,7 @@ public class HCatOutputCommitter extends
       List<Partition> partitionsAdded = new ArrayList<Partition>();
 
       try {
-        client = HCatOutputFormat.createHiveClient(jobInfo.getServerUri(), conf);
+        client = HCatOutputFormat.createHiveClient(tableInfo.getServerUri(), conf);
 
         StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters());
 
@@ -285,7 +298,7 @@ public class HCatOutputCommitter extends
           partitionsToAdd.add(
               constructPartition(
                   context,
-                  tblPath.toString(), jobInfo.getPartitionValues()
+                  tblPath.toString(), tableInfo.getPartitionValues()
                   ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
                   ,table, fs
                   ,grpName,perms));
@@ -303,35 +316,35 @@ public class HCatOutputCommitter extends
 
         //Publish the new partition(s)
         if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){
+          
+          Path src = new Path(ptnRootLocation);
 
-	          Path src = new Path(ptnRootLocation);
-
-	          // check here for each dir we're copying out, to see if it already exists, error out if so
-	          moveTaskOutputs(fs, src, src, tblPath,true);
-
-	          moveTaskOutputs(fs, src, src, tblPath,false);
-	          fs.delete(src, true);
-
+          // check here for each dir we're copying out, to see if it already exists, error out if so
+          moveTaskOutputs(fs, src, src, tblPath,true);
+          
+          moveTaskOutputs(fs, src, src, tblPath,false);
+          fs.delete(src, true);
+          
+          
+//          for (Partition partition : partitionsToAdd){
+//            partitionsAdded.add(client.add_partition(partition));
+//            // currently following add_partition instead of add_partitions because latter isn't 
+//            // all-or-nothing and we want to be able to roll back partitions we added if need be.
+//          }
 
-	//          for (Partition partition : partitionsToAdd){
-	//            partitionsAdded.add(client.add_partition(partition));
-	//            // currently following add_partition instead of add_partitions because latter isn't
-	//            // all-or-nothing and we want to be able to roll back partitions we added if need be.
-	//          }
-
-	          try {
-	            client.add_partitions(partitionsToAdd);
-	            partitionsAdded = partitionsToAdd;
-	          } catch (Exception e){
-	            // There was an error adding partitions : rollback fs copy and rethrow
-	            for (Partition p : partitionsToAdd){
-	              Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
-	              if (fs.exists(ptnPath)){
-	                fs.delete(ptnPath,true);
-	              }
-	            }
-	            throw e;
-	          }
+          try {
+            client.add_partitions(partitionsToAdd);
+            partitionsAdded = partitionsToAdd;
+          } catch (Exception e){
+            // There was an error adding partitions : rollback fs copy and rethrow
+            for (Partition p : partitionsToAdd){
+              Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
+              if (fs.exists(ptnPath)){
+                fs.delete(ptnPath,true);
+              }
+            }
+            throw e;
+          }
 
         }else{
           // no harProcessor, regular operation

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Mon Aug  1 17:17:04 2011
@@ -18,6 +18,17 @@
 
 package org.apache.hcatalog.mapreduce;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,11 +41,17 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
 import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -49,10 +66,6 @@ import org.apache.hcatalog.data.HCatReco
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.thrift.TException;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-
 /** The OutputFormat to use to write data to HCat. The key value is ignored and
  * and should be given as null. The value is the HCatRecord to write.*/
 public class HCatOutputFormat extends HCatBaseOutputFormat {
@@ -81,41 +94,41 @@ public class HCatOutputFormat extends HC
      * Set the info about the output to write for the Job. This queries the metadata server
      * to find the StorageDriver to use for the table.  Throws error if partition is already published.
      * @param job the job object
-     * @param outputJobInfo the table output info
+     * @param outputInfo the table output info
      * @throws IOException the exception in communicating with the metadata server
      */
     @SuppressWarnings("unchecked")
-    public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException {
+    public static void setOutput(Job job, HCatTableInfo outputInfo) throws IOException {
       HiveMetaStoreClient client = null;
 
       try {
 
         Configuration conf = job.getConfiguration();
-        client = createHiveClient(outputJobInfo.getServerUri(), conf);
-        Table table = client.getTable(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName());
+        client = createHiveClient(outputInfo.getServerUri(), conf);
+        Table table = client.getTable(outputInfo.getDatabaseName(), outputInfo.getTableName());
 
         if (table.getPartitionKeysSize() == 0 ){
-          if ((outputJobInfo.getPartitionValues() != null) && (!outputJobInfo.getPartitionValues().isEmpty())){
+          if ((outputInfo.getPartitionValues() != null) && (!outputInfo.getPartitionValues().isEmpty())){
             // attempt made to save partition values in non-partitioned table - throw error.
             throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, 
                 "Partition values specified for non-partitioned table");
           }
           // non-partitioned table
-          outputJobInfo.setPartitionValues(new HashMap<String, String>());
+          outputInfo.setPartitionValues(new HashMap<String, String>());
           
         } else {
           // partitioned table, we expect partition values
           // convert user specified map to have lower case key names
           Map<String, String> valueMap = new HashMap<String, String>();
-          if (outputJobInfo.getPartitionValues() != null){
-            for(Map.Entry<String, String> entry : outputJobInfo.getPartitionValues().entrySet()) {
+          if (outputInfo.getPartitionValues() != null){
+            for(Map.Entry<String, String> entry : outputInfo.getPartitionValues().entrySet()) {
               valueMap.put(entry.getKey().toLowerCase(), entry.getValue());
             }
           }
 
           if (
-              (outputJobInfo.getPartitionValues() == null)
-              || (outputJobInfo.getPartitionValues().size() < table.getPartitionKeysSize())
+              (outputInfo.getPartitionValues() == null)
+              || (outputInfo.getPartitionValues().size() < table.getPartitionKeysSize())
           ){
             // dynamic partition usecase - partition values were null, or not all were specified
             // need to figure out which keys are not specified.
@@ -132,7 +145,7 @@ public class HCatOutputFormat extends HC
               throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified");
             }
                         
-            outputJobInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys);
+            outputInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys);
             String dynHash;
             if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null){
               dynHash = String.valueOf(Math.random());
@@ -144,11 +157,11 @@ public class HCatOutputFormat extends HC
 
           }
 
-          outputJobInfo.setPartitionValues(valueMap);
+          outputInfo.setPartitionValues(valueMap);
         }
 
         //Handle duplicate publish
-        handleDuplicatePublish(job, outputJobInfo, client, table);
+        handleDuplicatePublish(job, outputInfo, client, table);
 
         StorageDescriptor tblSD = table.getSd();
         HCatSchema tableSchema = HCatUtil.extractSchemaFromStorageDescriptor(tblSD);
@@ -166,15 +179,14 @@ public class HCatOutputFormat extends HC
         String tblLocation = tblSD.getLocation();
         String location = driver.getOutputLocation(job,
             tblLocation, partitionCols,
-            outputJobInfo.getPartitionValues(),conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID));
+            outputInfo.getPartitionValues(),conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID));
 
         //Serialize the output info into the configuration
-        outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table));
-        outputJobInfo.setOutputSchema(tableSchema);
-        outputJobInfo.setLocation(location);
-        outputJobInfo.setHarRequested(harRequested);
-        outputJobInfo.setMaximumDynamicPartitions(maxDynamicPartitions);
-        conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+        OutputJobInfo jobInfo = new OutputJobInfo(outputInfo,
+                tableSchema, tableSchema, storerInfo, location, table);
+        jobInfo.setHarRequested(harRequested);
+        jobInfo.setMaximumDynamicPartitions(maxDynamicPartitions);
+        conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
 
         Path tblPath = new Path(tblLocation);
 
@@ -221,7 +233,7 @@ public class HCatOutputFormat extends HC
             // will correctly pick the right tokens which the committer will use and
             // cancel.
             
-            String tokenSignature = getTokenSignature(outputJobInfo);
+            String tokenSignature = getTokenSignature(outputInfo);
             if(tokenMap.get(tokenSignature) == null) {
               // get delegation tokens from hcat server and store them into the "job"
               // These will be used in the HCatOutputCommitter to publish partitions to
@@ -271,17 +283,17 @@ public class HCatOutputFormat extends HC
 
     // a signature string to associate with a HCatTableInfo - essentially
     // a concatenation of dbname, tablename and partition keyvalues.
-    private static String getTokenSignature(OutputJobInfo outputJobInfo) {
+    private static String getTokenSignature(HCatTableInfo outputInfo) {
       StringBuilder result = new StringBuilder("");
-      String dbName = outputJobInfo.getDatabaseName();
+      String dbName = outputInfo.getDatabaseName();
       if(dbName != null) {
         result.append(dbName);
       }
-      String tableName = outputJobInfo.getTableName();
+      String tableName = outputInfo.getTableName();
       if(tableName != null) {
         result.append("+" + tableName);
       }
-      Map<String, String> partValues = outputJobInfo.getPartitionValues();
+      Map<String, String> partValues = outputInfo.getPartitionValues();
       if(partValues != null) {
         for(Entry<String, String> entry: partValues.entrySet()) {
           result.append("+" + entry.getKey() + "=" + entry.getValue());
@@ -302,7 +314,7 @@ public class HCatOutputFormat extends HC
      * @throws MetaException
      * @throws TException
      */
-    private static void handleDuplicatePublish(Job job, OutputJobInfo outputInfo,
+    private static void handleDuplicatePublish(Job job, HCatTableInfo outputInfo,
         HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException {
 
       /*
@@ -354,7 +366,7 @@ public class HCatOutputFormat extends HC
     public static void setSchema(final Job job, final HCatSchema schema) throws IOException {
 
         OutputJobInfo jobInfo = getJobInfo(job);
-        Map<String,String> partMap = jobInfo.getPartitionValues();
+        Map<String,String> partMap = jobInfo.getTableInfo().getPartitionValues();
         setPartDetails(jobInfo, schema, partMap);
         job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
     }
@@ -461,7 +473,7 @@ public class HCatOutputFormat extends HC
       OutputJobInfo info =  HCatBaseOutputFormat.getJobInfo(context);
 //      Path workFile = osd.getWorkFilePath(context,info.getLocation());
       Path workFile = osd.getWorkFilePath(context,context.getConfiguration().get("mapred.output.dir"));
-      Path tblPath = new Path(info.getTableInfo().getTable().getSd().getLocation());
+      Path tblPath = new Path(info.getTable().getSd().getLocation());
       FileSystem fs = tblPath.getFileSystem(context.getConfiguration());
       FileStatus tblPathStat = fs.getFileStatus(tblPath);
       

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java Mon Aug  1 17:17:04 2011
@@ -62,7 +62,7 @@ public class HCatRecordWriter extends Re
 
       // If partition columns occur in data, we want to remove them.
       partColsToDel = jobInfo.getPosOfPartCols();
-      dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed();
+      dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed();
       dynamicPartCols = jobInfo.getPosOfDynPartCols();
       maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java Mon Aug  1 17:17:04 2011
@@ -18,13 +18,11 @@
 
 package org.apache.hcatalog.mapreduce;
 
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.schema.HCatSchema;
-
-import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 
 /**
  *
@@ -37,47 +35,128 @@ public class HCatTableInfo implements Se
 
   private static final long serialVersionUID = 1L;
 
+  public enum TableInfoType {
+    INPUT_INFO,
+    OUTPUT_INFO
+  };
+
+  private final TableInfoType tableInfoType;
+
+  /** The Metadata server uri */
+  private final String serverUri;
+
+  /** If the hcat server is configured to work with hadoop security, this
+   * variable will hold the principal name of the server - this will be used
+   * in the authentication to the hcat server using kerberos
+   */
+  private final String serverKerberosPrincipal;
+
   /** The db and table names */
   private final String dbName;
   private final String tableName;
 
-  /** The table schema. */
-  private final HCatSchema dataColumns;
-  private final HCatSchema partitionColumns;
+  /** The partition filter */
+  private String filter;
+
+  /** The partition predicates to filter on, an arbitrary AND/OR filter, if used to input from*/
+  private final String partitionPredicates;
 
-  /** The table being written to */
-  private final Table table;
+  /** The information about the partitions matching the specified query */
+  private JobInfo jobInfo;
 
-  /** The storer info */
-  private StorerInfo storerInfo;
+  /** The partition values to publish to, if used for output*/
+  private Map<String, String> partitionValues;
+
+  /** List of keys for which values were not specified at write setup time, to be infered at write time */
+  private List<String> dynamicPartitioningKeys;
+  
 
   /**
    * Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat}
    * for reading data from a table.
+   * @param serverUri the Metadata server uri
+   * @param serverKerberosPrincipal If the hcat server is configured to
    * work with hadoop security, the kerberos principal name of the server - else null
    * The principal name should be of the form:
    * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
    * The special string _HOST will be replaced automatically with the correct host name
    * @param dbName the db name
    * @param tableName the table name
-   * @param dataColumns schema of columns which contain data
-   * @param partitionColumns schema of partition columns
-   * @param storerInfo information about storage descriptor
-   * @param table hive metastore table class
    */
-  HCatTableInfo(
+  public static HCatTableInfo getInputTableInfo(String serverUri,
+      String serverKerberosPrincipal,
       String dbName,
-      String tableName,
-      HCatSchema dataColumns,
-      HCatSchema partitionColumns,
-      StorerInfo storerInfo,
-      Table table) {
+          String tableName) {
+    return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName, tableName, (String) null);
+  }
+
+  /**
+   * Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat}
+   * for reading data from a table.
+   * @param serverUri the Metadata server uri
+   * @param serverKerberosPrincipal If the hcat server is configured to
+   * work with hadoop security, the kerberos principal name of the server - else null
+   * The principal name should be of the form:
+   * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
+   * The special string _HOST will be replaced automatically with the correct host name
+   * @param dbName the db name
+   * @param tableName the table name
+   * @param filter the partition filter
+   */
+  public static HCatTableInfo getInputTableInfo(String serverUri, String serverKerberosPrincipal, String dbName,
+          String tableName, String filter) {
+    return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName, tableName, filter);
+  }
+
+  private HCatTableInfo(String serverUri, String serverKerberosPrincipal,
+      String dbName, String tableName, String filter) {
+      this.serverUri = serverUri;
+      this.serverKerberosPrincipal = serverKerberosPrincipal;
+      this.dbName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+      this.tableName = tableName;
+      this.partitionPredicates = null;
+      this.partitionValues = null;
+      this.tableInfoType = TableInfoType.INPUT_INFO;
+      this.filter = filter;
+  }
+  /**
+   * Initializes a new HCatTableInfo instance to be used with {@link HCatOutputFormat}
+   * for writing data from a table.
+   * @param serverUri the Metadata server uri
+   * @param serverKerberosPrincipal If the hcat server is configured to
+   * work with hadoop security, the kerberos principal name of the server - else null
+   * The principal name should be of the form:
+   * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
+   * The special string _HOST will be replaced automatically with the correct host name
+   * @param dbName the db name
+   * @param tableName the table name
+   * @param partitionValues The partition values to publish to, can be null or empty Map to
+   * indicate write to a unpartitioned table. For partitioned tables, this map should
+   * contain keys for all partition columns with corresponding values.
+   */
+  public static HCatTableInfo getOutputTableInfo(String serverUri,
+          String serverKerberosPrincipal, String dbName, String tableName, Map<String, String> partitionValues){
+      return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName,
+          tableName, partitionValues);
+  }
+
+  private HCatTableInfo(String serverUri, String serverKerberosPrincipal,
+      String dbName, String tableName, Map<String, String> partitionValues){
+    this.serverUri = serverUri;
+    this.serverKerberosPrincipal = serverKerberosPrincipal;
     this.dbName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
     this.tableName = tableName;
-    this.dataColumns = dataColumns;
-    this.table = table;
-    this.storerInfo = storerInfo;
-    this.partitionColumns = partitionColumns;
+    this.partitionPredicates = null;
+    this.partitionValues = partitionValues;
+    this.tableInfoType = TableInfoType.OUTPUT_INFO;
+  }
+
+  /**
+   * Gets the value of serverUri
+   * @return the serverUri
+   */
+  public String getServerUri() {
+    return serverUri;
   }
 
   /**
@@ -97,80 +176,97 @@ public class HCatTableInfo implements Se
   }
 
   /**
-   * @return return schema of data columns as defined in meta store
+   * Gets the value of partitionPredicates
+   * @return the partitionPredicates
    */
-  public HCatSchema getDataColumns() {
-    return dataColumns;
+  public String getPartitionPredicates() {
+    return partitionPredicates;
   }
 
   /**
-   * @return schema of partition columns
+   * Gets the value of partitionValues
+   * @return the partitionValues
    */
-  public HCatSchema getPartitionColumns() {
-    return partitionColumns;
+  public Map<String, String> getPartitionValues() {
+    return partitionValues;
   }
 
   /**
-   * @return the storerInfo
+   * Gets the value of job info
+   * @return the job info
    */
-  public StorerInfo getStorerInfo() {
-    return storerInfo;
+  public JobInfo getJobInfo() {
+    return jobInfo;
   }
 
   /**
-   * minimize dependency on hive classes so this is package private
-   * this should eventually no longer be used
-   * @return hive metastore representation of table
+   * Sets the value of jobInfo
+   * @param jobInfo the jobInfo to set
    */
-  Table getTable() {
-    return table;
+  public void setJobInfo(JobInfo jobInfo) {
+    this.jobInfo = jobInfo;
+  }
+
+  public TableInfoType getTableType(){
+    return this.tableInfoType;
   }
 
   /**
-   * create an HCatTableInfo instance from the supplied Hive Table instance
-   * @param table to create an instance from
-   * @return HCatTableInfo
-   * @throws IOException
+   * Sets the value of partitionValues
+   * @param partitionValues the partition values to set
    */
-  static HCatTableInfo valueOf(Table table) throws IOException {
-    HCatSchema dataColumns = HCatUtil.extractSchemaFromStorageDescriptor(table.getSd());
-    StorerInfo storerInfo = InitializeInput.extractStorerInfo(table.getSd(), table.getParameters());
-    HCatSchema partitionColumns = HCatUtil.getPartitionColumns(table);
-    return new HCatTableInfo(table.getDbName(),
-                                           table.getTableName(),
-                                           dataColumns,
-                                           partitionColumns,
-                                           storerInfo,
-                                           table);
+  void setPartitionValues(Map<String, String>  partitionValues) {
+    this.partitionValues = partitionValues;
   }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    HCatTableInfo tableInfo = (HCatTableInfo) o;
-
-    if (dataColumns != null ? !dataColumns.equals(tableInfo.dataColumns) : tableInfo.dataColumns != null) return false;
-    if (dbName != null ? !dbName.equals(tableInfo.dbName) : tableInfo.dbName != null) return false;
-    if (partitionColumns != null ? !partitionColumns.equals(tableInfo.partitionColumns) : tableInfo.partitionColumns != null)
-      return false;
-    if (storerInfo != null ? !storerInfo.equals(tableInfo.storerInfo) : tableInfo.storerInfo != null) return false;
-    if (table != null ? !table.equals(tableInfo.table) : tableInfo.table != null) return false;
-    if (tableName != null ? !tableName.equals(tableInfo.tableName) : tableInfo.tableName != null) return false;
+  /**
+   * Gets the value of partition filter
+   * @return the filter string
+   */
+  public String getFilter() {
+    return filter;
+  }
 
-    return true;
+  /**
+   * @return the serverKerberosPrincipal
+   */
+  public String getServerKerberosPrincipal() {
+    return serverKerberosPrincipal;
+  }
+
+  /**
+   * Returns whether or not Dynamic Partitioning is used
+   * @return whether or not dynamic partitioning is currently enabled and used
+   */
+  public boolean isDynamicPartitioningUsed() {
+    return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty()));
+  }
+
+  /**
+   * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys
+   * @param dynamicPartitioningKeys
+   */
+  public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) {
+    this.dynamicPartitioningKeys = dynamicPartitioningKeys;
+  }
+  
+  public List<String> getDynamicPartitioningKeys(){
+    return this.dynamicPartitioningKeys;
   }
 
 
   @Override
   public int hashCode() {
-    int result = dbName != null ? dbName.hashCode() : 0;
-    result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
-    result = 31 * result + (dataColumns != null ? dataColumns.hashCode() : 0);
-    result = 31 * result + (partitionColumns != null ? partitionColumns.hashCode() : 0);
-    result = 31 * result + (table != null ? table.hashCode() : 0);
-    result = 31 * result + (storerInfo != null ? storerInfo.hashCode() : 0);
+    int result = 17;
+    result = 31*result + (serverUri == null ? 0 : serverUri.hashCode());
+    result = 31*result + (serverKerberosPrincipal == null ? 0 : serverKerberosPrincipal.hashCode());
+    result = 31*result + (dbName == null? 0 : dbName.hashCode());
+    result = 31*result + tableName.hashCode();
+    result = 31*result + (filter == null? 0 : filter.hashCode());
+    result = 31*result + (partitionPredicates == null ? 0 : partitionPredicates.hashCode());
+    result = 31*result + tableInfoType.ordinal();
+    result = 31*result + (partitionValues == null ? 0 : partitionValues.hashCode());
+    result = 31*result + (dynamicPartitioningKeys == null ? 0 : dynamicPartitioningKeys.hashCode());
     return result;
   }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java Mon Aug  1 17:17:04 2011
@@ -17,6 +17,13 @@
  */
 package org.apache.hcatalog.mapreduce;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -31,9 +38,6 @@ import org.apache.hcatalog.common.HCatEx
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.schema.HCatSchema;
 
-import java.io.IOException;
-import java.util.*;
-
 /**
  * The Class which handles querying the metadata server using the MetaStoreClient. The list of
  * partitions matching the partition filter is fetched from the server and the information is
@@ -46,15 +50,15 @@ public class InitializeInput {
   static final String HCAT_KEY_PREFIX = "hcat.";
   private static final HiveConf hiveConf = new HiveConf(HCatInputFormat.class);
 
-  private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, InputJobInfo inputJobInfo) throws Exception {
-    if (inputJobInfo.getServerUri() != null){
+  private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, HCatTableInfo inputInfo) throws Exception {
+    if (inputInfo.getServerUri() != null){
 
       hiveConf.setBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, true);
       hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
-          inputJobInfo.getServerKerberosPrincipal());
+          inputInfo.getServerKerberosPrincipal());
 
       hiveConf.set("hive.metastore.local", "false");
-      hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, inputJobInfo.getServerUri());
+      hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, inputInfo.getServerUri());
     }
 
     return new HiveMetaStoreClient(hiveConf,null);
@@ -64,29 +68,28 @@ public class InitializeInput {
    * Set the input to use for the Job. This queries the metadata server with the specified partition predicates,
    * gets the matching partitions, puts the information in the configuration object.
    * @param job the job object
-   * @param inputJobInfo information on the Input to read
+   * @param inputInfo the hcat table input info
    * @throws Exception
    */
-  public static void setInput(Job job, InputJobInfo inputJobInfo) throws Exception {
+  public static void setInput(Job job, HCatTableInfo inputInfo) throws Exception {
 
-    //* Create and initialize an InputJobInfo object
-    //* Serialize the InputJobInfo and save in the Job's Configuration object
+    //* Create and initialize an JobInfo object
+    //* Serialize the JobInfo and save in the Job's Configuration object
 
     HiveMetaStoreClient client = null;
 
     try {
-      client = createHiveMetaClient(job.getConfiguration(),inputJobInfo);
-      Table table = client.getTable(inputJobInfo.getDatabaseName(),
-                                                 inputJobInfo.getTableName());
+      client = createHiveMetaClient(job.getConfiguration(),inputInfo);
+      Table table = client.getTable(inputInfo.getDatabaseName(), inputInfo.getTableName());
+      HCatSchema tableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
 
       List<PartInfo> partInfoList = new ArrayList<PartInfo>();
 
       if( table.getPartitionKeys().size() != 0 ) {
         //Partitioned table
-        List<Partition> parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(),
-                                                                                inputJobInfo.getTableName(),
-                                                                                inputJobInfo.getFilter(),
-                                                                                (short) -1);
+        List<Partition> parts = client.listPartitionsByFilter(
+            inputInfo.getDatabaseName(), inputInfo.getTableName(),
+            inputInfo.getFilter(), (short) -1);
 
         // Default to 100,000 partitions if hive.metastore.maxpartition is not defined
         int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
@@ -107,12 +110,13 @@ public class InitializeInput {
         partInfo.setPartitionValues(new HashMap<String,String>());
         partInfoList.add(partInfo);
       }
-      inputJobInfo.setPartitions(partInfoList);
-      inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table));
+
+      JobInfo hcatJobInfo = new JobInfo(inputInfo, tableSchema, partInfoList);
+      inputInfo.setJobInfo(hcatJobInfo);
 
       job.getConfiguration().set(
           HCatConstants.HCAT_KEY_JOB_INFO,
-          HCatUtil.serialize(inputJobInfo)
+          HCatUtil.serialize(hcatJobInfo)
       );
     } finally {
       if (client != null ) {

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java Mon Aug  1 17:17:04 2011
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The class used to serialize and store the information read from the metadata server */
+public class JobInfo implements Serializable{
+
+    /** The serialization version */
+    private static final long serialVersionUID = 1L;
+
+    /** The db and table names. */
+    private final String dbName;
+    private final String tableName;
+
+    /** The table schema. */
+    private final HCatSchema tableSchema;
+
+    /** The list of partitions matching the filter. */
+    private final List<PartInfo> partitions;
+
+    /**
+     * Instantiates a new hcat job info.
+     * @param hcatTableInfo 
+     * @param tableSchema the table schema
+     * @param partitions the partitions
+     */
+    public JobInfo(HCatTableInfo hcatTableInfo, HCatSchema tableSchema,
+            List<PartInfo> partitions) {
+        this.tableName = hcatTableInfo.getTableName();
+        this.dbName = hcatTableInfo.getDatabaseName();
+        this.tableSchema = tableSchema;
+        this.partitions = partitions;
+    }
+
+    /**
+     * Gets the value of dbName
+     * @return the dbName
+     */
+    public String getDatabaseName() {
+        return tableName;
+    }
+
+    /**
+     * Gets the value of tableName
+     * @return the tableName
+     */
+    public String getTableName() {
+        return tableName;
+    }
+
+    /**
+     * Gets the value of tableSchema
+     * @return the tableSchema
+     */
+    public HCatSchema getTableSchema() {
+        return tableSchema;
+    }
+
+    /**
+     * Gets the value of partitions
+     * @return the partitions
+     */
+    public List<PartInfo> getPartitions() {
+        return partitions;
+    }
+
+}

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java Mon Aug  1 17:17:04 2011
@@ -18,126 +18,79 @@
 
 package org.apache.hcatalog.mapreduce;
 
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hcatalog.data.schema.HCatSchema;
-
 import java.io.Serializable;
-import java.util.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hcatalog.data.schema.HCatSchema;
 
 /** The class used to serialize and store the output related information  */
-public class OutputJobInfo implements Serializable {
+class OutputJobInfo implements Serializable {
+
+    /** The serialization version. */
+    private static final long serialVersionUID = 1L;
+
+    /** The table info provided by user. */
+    private final HCatTableInfo tableInfo;
+
+    /** The output schema. This is given to us by user.  This wont contain any
+     * partition columns ,even if user has specified them.
+     * */
+    private HCatSchema outputSchema;
+
+    /** This is table schema, retrieved from metastore. */
+    private final HCatSchema tableSchema;
+
+    /** The storer info */
+    private final StorerInfo storerInfo;
+
+    /** The location of the partition being written */
+    private final String location;
+
+    /** The table being written to */
+    private final Table table;
+
+    /** This is a list of partition columns which will be deleted from data, if
+     * data contains partition columns.*/
+
+    private List<Integer> posOfPartCols;
+    private List<Integer> posOfDynPartCols;
+
+    private int maxDynamicPartitions;
 
-  /** The db and table names. */
-  private final String databaseName;
-  private final String tableName;
-
-  /** The serialization version. */
-  private static final long serialVersionUID = 1L;
-
-  /** The table info provided by user. */
-  private HCatTableInfo tableInfo;
-
-  /** The output schema. This is given to us by user.  This wont contain any
-   * partition columns ,even if user has specified them.
-   * */
-  private HCatSchema outputSchema;
-
-  /** The location of the partition being written */
-  private String location;
-
-  /** The partition values to publish to, if used for output*/
-  private Map<String, String> partitionValues;
-
-  /** The Metadata server uri */
-  private final String serverUri;
-
-  /** If the hcat server is configured to work with hadoop security, this
-   * variable will hold the principal name of the server - this will be used
-   * in the authentication to the hcat server using kerberos
-   */
-  private final String serverKerberosPrincipal;
-
-  private List<Integer> posOfPartCols;
-  private List<Integer> posOfDynPartCols;
-
-  private Map<String,String> properties;
-
-  private int maxDynamicPartitions;
-
-  /** List of keys for which values were not specified at write setup time, to be infered at write time */
-  private List<String> dynamicPartitioningKeys;
-
-  private boolean harRequested;
-
-  /**
-   * Initializes a new OutputJobInfo instance
-   * for writing data from a table.
-   * @param databaseName the db name
-   * @param tableName the table name
-   * @param partitionValues The partition values to publish to, can be null or empty Map to
-   * @param serverUri the Metadata server uri
-   * @param serverKerberosPrincipal If the hcat server is configured to
-   * work with hadoop security, the kerberos principal name of the server - else null
-   * The principal name should be of the form:
-   * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
-   * The special string _HOST will be replaced automatically with the correct host name
-   * indicate write to a unpartitioned table. For partitioned tables, this map should
-   * contain keys for all partition columns with corresponding values.
-   */
-  public static OutputJobInfo create(String databaseName,
-                                     String tableName,
-                                     Map<String, String> partitionValues,
-                                     String serverUri,
-                                     String serverKerberosPrincipal) {
-    return new OutputJobInfo(databaseName,
-        tableName,
-        partitionValues,
-        serverUri,
-        serverKerberosPrincipal);
-  }
-
-  private OutputJobInfo(String databaseName,
-                        String tableName,
-                        Map<String, String> partitionValues,
-                        String serverUri,
-                        String serverKerberosPrincipal) {
-    this.databaseName =  (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
-    this.tableName = tableName;
-    this.serverUri = serverUri;
-    this.serverKerberosPrincipal = serverKerberosPrincipal;
-    this.partitionValues = partitionValues;
-    this.properties = new HashMap<String,String>();
-  }
-
-  /**
-   * @return the posOfPartCols
-   */
-  protected List<Integer> getPosOfPartCols() {
-    return posOfPartCols;
-  }
-
-  /**
-   * @return the posOfDynPartCols
-   */
-  protected List<Integer> getPosOfDynPartCols() {
-    return posOfDynPartCols;
-  }
-
-  /**
-   * @param posOfPartCols the posOfPartCols to set
-   */
-  protected void setPosOfPartCols(List<Integer> posOfPartCols) {
-    // sorting the list in the descending order so that deletes happen back-to-front
-    Collections.sort(posOfPartCols, new Comparator<Integer> () {
-      @Override
-      public int compare(Integer earlier, Integer later) {
-        return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1);
-      }
-    });
-    this.posOfPartCols = posOfPartCols;
-  }
+    private boolean harRequested;
 
-  /**
+    /**
+     * @return the posOfPartCols
+     */
+    protected List<Integer> getPosOfPartCols() {
+      return posOfPartCols;
+    }
+
+    /**
+     * @return the posOfDynPartCols
+     */
+    protected List<Integer> getPosOfDynPartCols() {
+      return posOfDynPartCols;
+    }
+
+    /**
+     * @param posOfPartCols the posOfPartCols to set
+     */
+    protected void setPosOfPartCols(List<Integer> posOfPartCols) {
+      // sorting the list in the descending order so that deletes happen back-to-front
+      Collections.sort(posOfPartCols, new Comparator<Integer> () {
+        @Override
+        public int compare(Integer earlier, Integer later) {
+          return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1);
+        }
+      });
+      this.posOfPartCols = posOfPartCols;
+    }
+
+    /**
      * @param posOfDynPartCols the posOfDynPartCols to set
      */
     protected void setPosOfDynPartCols(List<Integer> posOfDynPartCols) {
@@ -145,153 +98,97 @@ public class OutputJobInfo implements Se
       this.posOfDynPartCols = posOfDynPartCols;
     }
 
-  /**
-   * @return the tableInfo
-   */
-  public HCatTableInfo getTableInfo() {
-    return tableInfo;
-  }
-
-  /**
-   * @return the outputSchema
-   */
-  public HCatSchema getOutputSchema() {
-    return outputSchema;
-  }
-
-  /**
-   * @param schema the outputSchema to set
-   */
-  public void setOutputSchema(HCatSchema schema) {
-    this.outputSchema = schema;
-  }
-
-  /**
-   * @return the location
-   */
-  public String getLocation() {
-    return location;
-  }
-
-  /**
-   * @param location location to write to
-   */
-  void setLocation(String location) {
-    this.location = location;
-  }
-  /**
-   * Sets the value of partitionValues
-   * @param partitionValues the partition values to set
-   */
-  void setPartitionValues(Map<String, String>  partitionValues) {
-    this.partitionValues = partitionValues;
-  }
-
-  /**
-   * Gets the value of partitionValues
-   * @return the partitionValues
-   */
-  public Map<String, String> getPartitionValues() {
-    return partitionValues;
-  }
-
-  /**
-   * @return metastore thrift server URI
-   */
-  public String getServerUri() {
-    return serverUri;
-  }
-
-  /**
-   * @return the serverKerberosPrincipal
-   */
-  public String getServerKerberosPrincipal() {
-    return serverKerberosPrincipal;
-  }
-
-  /**
-   * set the tablInfo instance
-   * this should be the same instance
-   * determined by this object's DatabaseName and TableName
-   * @param tableInfo
-   */
-  void setTableInfo(HCatTableInfo tableInfo) {
-    this.tableInfo = tableInfo;
-  }
-
-  /**
-   * @return database name of table to write to
-   */
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-  /**
-   * @return name of table to write to
-   */
-  public String getTableName() {
-    return tableName;
-  }
-
-  /**
-   * Set/Get Property information to be passed down to *StorageDriver implementation
-   * put implementation specific storage driver configurations here
-   * @return
-   */
-  public Map<String,String> getProperties() {
-    return properties;
-  }
-
-  /**
-   * Set maximum number of allowable dynamic partitions
-   * @param maxDynamicPartitions
-   */
-  public void setMaximumDynamicPartitions(int maxDynamicPartitions){
-    this.maxDynamicPartitions = maxDynamicPartitions;
-  }
-
-  /**
-   * Returns maximum number of allowable dynamic partitions
-   * @return maximum number of allowable dynamic partitions
-   */
-  public int getMaxDynamicPartitions() {
-    return this.maxDynamicPartitions;
-  }
-
-  /**
-   * Sets whether or not hadoop archiving has been requested for this job
-   * @param harRequested
-   */
-  public void setHarRequested(boolean harRequested){
-    this.harRequested = harRequested;
-  }
-
-  /**
-   * Returns whether or not hadoop archiving has been requested for this job
-   * @return whether or not hadoop archiving has been requested for this job
-   */
-  public boolean getHarRequested() {
-    return this.harRequested;
-  }
-
-  /**
-   * Returns whether or not Dynamic Partitioning is used
-   * @return whether or not dynamic partitioning is currently enabled and used
-   */
-  public boolean isDynamicPartitioningUsed() {
-    return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty()));
-  }
-
-  /**
-   * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys
-   * @param dynamicPartitioningKeys
-   */
-  public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) {
-    this.dynamicPartitioningKeys = dynamicPartitioningKeys;
-  }
-
-  public List<String> getDynamicPartitioningKeys(){
-    return this.dynamicPartitioningKeys;
-  }
+    public OutputJobInfo(HCatTableInfo tableInfo, HCatSchema outputSchema, HCatSchema tableSchema,
+        StorerInfo storerInfo, String location, Table table) {
+      super();
+      this.tableInfo = tableInfo;
+      this.outputSchema = outputSchema;
+      this.tableSchema = tableSchema;
+      this.storerInfo = storerInfo;
+      this.location = location;
+      this.table = table;
+    }
+
+    /**
+     * @return the tableInfo
+     */
+    public HCatTableInfo getTableInfo() {
+      return tableInfo;
+    }
+
+    /**
+     * @return the outputSchema
+     */
+    public HCatSchema getOutputSchema() {
+      return outputSchema;
+    }
+
+    /**
+     * @param schema the outputSchema to set
+     */
+    public void setOutputSchema(HCatSchema schema) {
+      this.outputSchema = schema;
+    }
+
+    /**
+     * @return the tableSchema
+     */
+    public HCatSchema getTableSchema() {
+      return tableSchema;
+    }
+
+    /**
+     * @return the storerInfo
+     */
+    public StorerInfo getStorerInfo() {
+      return storerInfo;
+    }
+
+    /**
+     * @return the location
+     */
+    public String getLocation() {
+      return location;
+    }
+
+    /**
+     * Gets the value of table
+     * @return the table
+     */
+    public Table getTable() {
+      return table;
+    }
+
+    /**
+     * Set maximum number of allowable dynamic partitions
+     * @param maxDynamicPartitions
+     */
+    public void setMaximumDynamicPartitions(int maxDynamicPartitions){
+      this.maxDynamicPartitions = maxDynamicPartitions;
+    }
+    
+    /**
+     * Returns maximum number of allowable dynamic partitions
+     * @return maximum number of allowable dynamic partitions
+     */
+    public int getMaxDynamicPartitions() {
+      return this.maxDynamicPartitions;
+    }
+
+    /**
+     * Sets whether or not hadoop archiving has been requested for this job
+     * @param harRequested
+     */
+    public void setHarRequested(boolean harRequested){
+      this.harRequested = harRequested;
+    }
+    
+    /**
+     * Returns whether or not hadoop archiving has been requested for this job
+     * @return whether or not hadoop archiving has been requested for this job
+     */
+    public boolean getHarRequested() {
+      return this.harRequested;
+    }
 
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java Mon Aug  1 17:17:04 2011
@@ -17,6 +17,10 @@
  */
 package org.apache.hcatalog.pig;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -27,7 +31,7 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.data.Pair;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
 import org.apache.pig.Expression;
 import org.apache.pig.Expression.BinaryExpression;
 import org.apache.pig.LoadFunc;
@@ -35,10 +39,6 @@ import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.impl.util.UDFContext;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
 /**
  * Pig {@link LoadFunc} to read data from HCat
  */
@@ -82,12 +82,14 @@ public class HCatLoader extends HCatBase
     // in the hadoop front end mapred.task.id property will not be set in
     // the Configuration
     if (!HCatUtil.checkJobContextIfRunningFromBackend(job)){
-      HCatInputFormat.setInput(job,
-                                            InputJobInfo.create(dbName,
-                                                                         tableName,
-                                                                         getPartitionFilterString(),
-                                                                         hcatServerUri != null ? hcatServerUri : (hcatServerUri = PigHCatUtil.getHCatServerUri(job)),
-                                                                         PigHCatUtil.getHCatServerPrincipal(job)));
+
+      HCatInputFormat.setInput(job, HCatTableInfo.getInputTableInfo(
+              hcatServerUri!=null ? hcatServerUri :
+                  (hcatServerUri = PigHCatUtil.getHCatServerUri(job)),
+              PigHCatUtil.getHCatServerPrincipal(job),
+              dbName,
+              tableName,
+              getPartitionFilterString()));
     }
 
     // Need to also push projections by calling setOutputSchema on

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1152865&r1=1152864&r2=1152865&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Mon Aug  1 17:17:04 2011
@@ -18,6 +18,9 @@
 
 package org.apache.hcatalog.pig;
 
+import java.io.IOException;
+import java.util.Properties;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -27,7 +30,7 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.mapreduce.HCatOutputCommitter;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -35,9 +38,6 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 
-import java.io.IOException;
-import java.util.Properties;
-
 /**
  * HCatStorer.
  *
@@ -72,20 +72,13 @@ public class HCatStorer extends HCatBase
     Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
 
     String[] userStr = location.split("\\.");
-    OutputJobInfo outputJobInfo;
-
+    HCatTableInfo tblInfo;
     if(userStr.length == 2) {
-      outputJobInfo = OutputJobInfo.create(userStr[0],
-                                                             userStr[1],
-                                                             partitions,
-                                                             PigHCatUtil.getHCatServerUri(job),
-                                                             PigHCatUtil.getHCatServerPrincipal(job));
+      tblInfo = HCatTableInfo.getOutputTableInfo(PigHCatUtil.getHCatServerUri(job),
+          PigHCatUtil.getHCatServerPrincipal(job), userStr[0],userStr[1],partitions);
     } else {
-      outputJobInfo = OutputJobInfo.create(null,
-                                                             userStr[0],
-                                                             partitions,
-                                                             PigHCatUtil.getHCatServerUri(job),
-                                                             PigHCatUtil.getHCatServerPrincipal(job));
+      tblInfo = HCatTableInfo.getOutputTableInfo(PigHCatUtil.getHCatServerUri(job),
+          PigHCatUtil.getHCatServerPrincipal(job), null,userStr[0],partitions);
     }
 
 
@@ -101,7 +94,7 @@ public class HCatStorer extends HCatBase
         throw new FrontendException("Schema for data cannot be determined.", PigHCatUtil.PIG_EXCEPTION_CODE);
       }
       try{
-        HCatOutputFormat.setOutput(job, outputJobInfo);
+        HCatOutputFormat.setOutput(job, tblInfo);
       } catch(HCatException he) {
           // pass the message to the user - essentially something about the table
           // information passed to HCatOutputFormat was not right