You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by tr...@apache.org on 2012/09/10 23:29:03 UTC

svn commit: r1383152 [11/27] - in /incubator/hcatalog/trunk: ./ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/ ...

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java Mon Sep 10 23:28:55 2012
@@ -46,272 +46,271 @@ import org.apache.hcatalog.data.HCatReco
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
 
-public abstract class HCatBaseInputFormat 
-  extends InputFormat<WritableComparable, HCatRecord> {
-  
-  /**
-   * get the schema for the HCatRecord data returned by HCatInputFormat.
-   * 
-   * @param context the jobContext
-   * @throws IllegalArgumentException
-   */
-  private Class<? extends InputFormat> inputFileFormatClass;
-
-  // TODO needs to go in InitializeInput? as part of InputJobInfo
-  public static HCatSchema getOutputSchema(JobContext context) 
-    throws IOException {
-    String os = context.getConfiguration().get(
-                                HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
-    if (os == null) {
-      return getTableSchema(context);
-    } else {
-      return (HCatSchema) HCatUtil.deserialize(os);
-    }
-  }
-  
-  /**
-   * Set the schema for the HCatRecord data returned by HCatInputFormat.
-   * @param job the job object
-   * @param hcatSchema the schema to use as the consolidated schema
-   */
-  public static void setOutputSchema(Job job,HCatSchema hcatSchema) 
-    throws IOException {
-    job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, 
-                               HCatUtil.serialize(hcatSchema));
-  }
-
-  protected static
-    org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>
-    getMapRedInputFormat (JobConf job, Class inputFormatClass) throws IOException {
-      return (
-          org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>) 
-        ReflectionUtils.newInstance(inputFormatClass, job);
-  }
-
-  /**
-   * Logically split the set of input files for the job. Returns the
-   * underlying InputFormat's splits
-   * @param jobContext the job context object
-   * @return the splits, an HCatInputSplit wrapper over the storage
-   *         handler InputSplits
-   * @throws IOException or InterruptedException
-   */
-  @Override
-  public List<InputSplit> getSplits(JobContext jobContext)
-  throws IOException, InterruptedException {
-
-    //Get the job info from the configuration,
-    //throws exception if not initialized
-    InputJobInfo inputJobInfo;
-    try {
-      inputJobInfo = getJobInfo(jobContext);
-    } catch (Exception e) {
-      throw new IOException(e);
+public abstract class HCatBaseInputFormat
+    extends InputFormat<WritableComparable, HCatRecord> {
+
+    /**
+     * get the schema for the HCatRecord data returned by HCatInputFormat.
+     *
+     * @param context the jobContext
+     * @throws IllegalArgumentException
+     */
+    private Class<? extends InputFormat> inputFileFormatClass;
+
+    // TODO needs to go in InitializeInput? as part of InputJobInfo
+    public static HCatSchema getOutputSchema(JobContext context)
+        throws IOException {
+        String os = context.getConfiguration().get(
+            HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
+        if (os == null) {
+            return getTableSchema(context);
+        } else {
+            return (HCatSchema) HCatUtil.deserialize(os);
+        }
     }
 
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-    List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
-    if(partitionInfoList == null ) {
-      //No partitions match the specified partition filter
-      return splits;
+    /**
+     * Set the schema for the HCatRecord data returned by HCatInputFormat.
+     * @param job the job object
+     * @param hcatSchema the schema to use as the consolidated schema
+     */
+    public static void setOutputSchema(Job job, HCatSchema hcatSchema)
+        throws IOException {
+        job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA,
+            HCatUtil.serialize(hcatSchema));
     }
 
-    HCatStorageHandler storageHandler;
-    JobConf jobConf;
-    Configuration conf = jobContext.getConfiguration();
-    //For each matching partition, call getSplits on the underlying InputFormat
-    for(PartInfo partitionInfo : partitionInfoList) {
-      jobConf = HCatUtil.getJobConfFromContext(jobContext);
-      setInputPath(jobConf, partitionInfo.getLocation());
-      Map<String,String> jobProperties = partitionInfo.getJobProperties();
-
-      HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
-      for(HCatFieldSchema field: 
-          inputJobInfo.getTableInfo().getDataColumns().getFields())
-          allCols.append(field);
-      for(HCatFieldSchema field: 
-          inputJobInfo.getTableInfo().getPartitionColumns().getFields())
-          allCols.append(field);
-
-      HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
-
-      storageHandler = HCatUtil.getStorageHandler(
-          jobConf, partitionInfo);
-
-      //Get the input format
-      Class inputFormatClass = storageHandler.getInputFormatClass();
-      org.apache.hadoop.mapred.InputFormat inputFormat = 
-                            getMapRedInputFormat(jobConf, inputFormatClass);
-
-      //Call getSplit on the InputFormat, create an
-      //HCatSplit for each underlying split
-      //NumSplits is 0 for our purposes
-      org.apache.hadoop.mapred.InputSplit[] baseSplits = 
-        inputFormat.getSplits(jobConf, 0);
-
-      for(org.apache.hadoop.mapred.InputSplit split : baseSplits) {
-        splits.add(new HCatSplit(
-            partitionInfo,
-            split,allCols));
-      }
+    protected static org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>
+    getMapRedInputFormat(JobConf job, Class inputFormatClass) throws IOException {
+        return (
+            org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>)
+            ReflectionUtils.newInstance(inputFormatClass, job);
     }
 
-    return splits;
-  }
+    /**
+     * Logically split the set of input files for the job. Returns the
+     * underlying InputFormat's splits
+     * @param jobContext the job context object
+     * @return the splits, an HCatInputSplit wrapper over the storage
+     *         handler InputSplits
+     * @throws IOException or InterruptedException
+     */
+    @Override
+    public List<InputSplit> getSplits(JobContext jobContext)
+        throws IOException, InterruptedException {
+
+        //Get the job info from the configuration,
+        //throws exception if not initialized
+        InputJobInfo inputJobInfo;
+        try {
+            inputJobInfo = getJobInfo(jobContext);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
+        if (partitionInfoList == null) {
+            //No partitions match the specified partition filter
+            return splits;
+        }
+
+        HCatStorageHandler storageHandler;
+        JobConf jobConf;
+        Configuration conf = jobContext.getConfiguration();
+        //For each matching partition, call getSplits on the underlying InputFormat
+        for (PartInfo partitionInfo : partitionInfoList) {
+            jobConf = HCatUtil.getJobConfFromContext(jobContext);
+            setInputPath(jobConf, partitionInfo.getLocation());
+            Map<String, String> jobProperties = partitionInfo.getJobProperties();
+
+            HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
+            for (HCatFieldSchema field :
+                inputJobInfo.getTableInfo().getDataColumns().getFields())
+                allCols.append(field);
+            for (HCatFieldSchema field :
+                inputJobInfo.getTableInfo().getPartitionColumns().getFields())
+                allCols.append(field);
+
+            HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+
+            storageHandler = HCatUtil.getStorageHandler(
+                jobConf, partitionInfo);
+
+            //Get the input format
+            Class inputFormatClass = storageHandler.getInputFormatClass();
+            org.apache.hadoop.mapred.InputFormat inputFormat =
+                getMapRedInputFormat(jobConf, inputFormatClass);
+
+            //Call getSplit on the InputFormat, create an
+            //HCatSplit for each underlying split
+            //NumSplits is 0 for our purposes
+            org.apache.hadoop.mapred.InputSplit[] baseSplits =
+                inputFormat.getSplits(jobConf, 0);
+
+            for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
+                splits.add(new HCatSplit(
+                    partitionInfo,
+                    split, allCols));
+            }
+        }
+
+        return splits;
+    }
 
-  /**
-   * Create the RecordReader for the given InputSplit. Returns the underlying
-   * RecordReader if the required operations are supported and schema matches
-   * with HCatTable schema. Returns an HCatRecordReader if operations need to
-   * be implemented in HCat.
-   * @param split the split
-   * @param taskContext the task attempt context
-   * @return the record reader instance, either an HCatRecordReader(later) or
-   *         the underlying storage handler's RecordReader
-   * @throws IOException or InterruptedException
-   */
-  @Override
-  public RecordReader<WritableComparable, HCatRecord> 
-  createRecordReader(InputSplit split,
-      TaskAttemptContext taskContext) throws IOException, InterruptedException {
-
-    HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
-    PartInfo partitionInfo = hcatSplit.getPartitionInfo();
-    JobContext jobContext = taskContext;
-
-    HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
-        jobContext.getConfiguration(), partitionInfo);
-    
-    JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
-    Map<String, String> jobProperties = partitionInfo.getJobProperties();
-    HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+    /**
+     * Create the RecordReader for the given InputSplit. Returns the underlying
+     * RecordReader if the required operations are supported and schema matches
+     * with HCatTable schema. Returns an HCatRecordReader if operations need to
+     * be implemented in HCat.
+     * @param split the split
+     * @param taskContext the task attempt context
+     * @return the record reader instance, either an HCatRecordReader(later) or
+     *         the underlying storage handler's RecordReader
+     * @throws IOException or InterruptedException
+     */
+    @Override
+    public RecordReader<WritableComparable, HCatRecord>
+    createRecordReader(InputSplit split,
+                       TaskAttemptContext taskContext) throws IOException, InterruptedException {
+
+        HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
+        PartInfo partitionInfo = hcatSplit.getPartitionInfo();
+        JobContext jobContext = taskContext;
+
+        HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
+            jobContext.getConfiguration(), partitionInfo);
+
+        JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
+        Map<String, String> jobProperties = partitionInfo.getJobProperties();
+        HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
 
-    Map<String,String> valuesNotInDataCols = getColValsNotInDataColumns(
-        getOutputSchema(jobContext),partitionInfo
+        Map<String, String> valuesNotInDataCols = getColValsNotInDataColumns(
+            getOutputSchema(jobContext), partitionInfo
         );
 
-    return new HCatRecordReader(storageHandler, valuesNotInDataCols);
-  }
+        return new HCatRecordReader(storageHandler, valuesNotInDataCols);
+    }
 
 
-  /**
-   * gets values for fields requested by output schema which will not be in the data
-   */
-  private static Map<String,String> getColValsNotInDataColumns(HCatSchema outputSchema,
-      PartInfo partInfo){
-    HCatSchema dataSchema = partInfo.getPartitionSchema();
-    Map<String,String> vals = new HashMap<String,String>();
-    for (String fieldName : outputSchema.getFieldNames()){
-      if (dataSchema.getPosition(fieldName) == null){
-        // this entry of output is not present in the output schema
-        // so, we first check the table schema to see if it is a part col
-        
-        if (partInfo.getPartitionValues().containsKey(fieldName)){
-          vals.put(fieldName, partInfo.getPartitionValues().get(fieldName));
-        } else {
-          vals.put(fieldName, null);
+    /**
+     * gets values for fields requested by output schema which will not be in the data
+     */
+    private static Map<String, String> getColValsNotInDataColumns(HCatSchema outputSchema,
+                                                                  PartInfo partInfo) {
+        HCatSchema dataSchema = partInfo.getPartitionSchema();
+        Map<String, String> vals = new HashMap<String, String>();
+        for (String fieldName : outputSchema.getFieldNames()) {
+            if (dataSchema.getPosition(fieldName) == null) {
+                // this entry of output is not present in the output schema
+                // so, we first check the table schema to see if it is a part col
+
+                if (partInfo.getPartitionValues().containsKey(fieldName)) {
+                    vals.put(fieldName, partInfo.getPartitionValues().get(fieldName));
+                } else {
+                    vals.put(fieldName, null);
+                }
+            }
         }
-      }
+        return vals;
     }
-    return vals;
-  }
 
-  /**
-   * Gets the HCatTable schema for the table specified in the HCatInputFormat.setInput call
-   * on the specified job context. This information is available only after HCatInputFormat.setInput
-   * has been called for a JobContext.
-   * @param context the context
-   * @return the table schema
-   * @throws IOException if HCatInputFormat.setInput has not been called 
-   *                     for the current context
-   */
-  public static HCatSchema getTableSchema(JobContext context) 
-  throws IOException {
-    InputJobInfo inputJobInfo = getJobInfo(context);
-      HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
-      for(HCatFieldSchema field: 
-          inputJobInfo.getTableInfo().getDataColumns().getFields())
-          allCols.append(field);
-      for(HCatFieldSchema field: 
-          inputJobInfo.getTableInfo().getPartitionColumns().getFields())
-          allCols.append(field);
-    return allCols;
-  }
-
-  /**
-   * Gets the InputJobInfo object by reading the Configuration and deserializing
-   * the string. If InputJobInfo is not present in the configuration, throws an
-   * exception since that means HCatInputFormat.setInput has not been called.
-   * @param jobContext the job context
-   * @return the InputJobInfo object
-   * @throws IOException the exception
-   */
-  private static InputJobInfo getJobInfo(JobContext jobContext) 
-    throws IOException {
-    String jobString = jobContext.getConfiguration().get(
-                                  HCatConstants.HCAT_KEY_JOB_INFO);
-    if( jobString == null ) {
-      throw new IOException("job information not found in JobContext."
-         + " HCatInputFormat.setInput() not called?");
+    /**
+     * Gets the HCatTable schema for the table specified in the HCatInputFormat.setInput call
+     * on the specified job context. This information is available only after HCatInputFormat.setInput
+     * has been called for a JobContext.
+     * @param context the context
+     * @return the table schema
+     * @throws IOException if HCatInputFormat.setInput has not been called
+     *                     for the current context
+     */
+    public static HCatSchema getTableSchema(JobContext context)
+        throws IOException {
+        InputJobInfo inputJobInfo = getJobInfo(context);
+        HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
+        for (HCatFieldSchema field :
+            inputJobInfo.getTableInfo().getDataColumns().getFields())
+            allCols.append(field);
+        for (HCatFieldSchema field :
+            inputJobInfo.getTableInfo().getPartitionColumns().getFields())
+            allCols.append(field);
+        return allCols;
     }
 
-    return (InputJobInfo) HCatUtil.deserialize(jobString);
-  }
+    /**
+     * Gets the InputJobInfo object by reading the Configuration and deserializing
+     * the string. If InputJobInfo is not present in the configuration, throws an
+     * exception since that means HCatInputFormat.setInput has not been called.
+     * @param jobContext the job context
+     * @return the InputJobInfo object
+     * @throws IOException the exception
+     */
+    private static InputJobInfo getJobInfo(JobContext jobContext)
+        throws IOException {
+        String jobString = jobContext.getConfiguration().get(
+            HCatConstants.HCAT_KEY_JOB_INFO);
+        if (jobString == null) {
+            throw new IOException("job information not found in JobContext."
+                + " HCatInputFormat.setInput() not called?");
+        }
 
-  private void setInputPath(JobConf jobConf, String location) 
-  throws IOException{
+        return (InputJobInfo) HCatUtil.deserialize(jobString);
+    }
 
-    // ideally we should just call FileInputFormat.setInputPaths() here - but
-    // that won't work since FileInputFormat.setInputPaths() needs
-    // a Job object instead of a JobContext which we are handed here
-
-    int length = location.length();
-    int curlyOpen = 0;
-    int pathStart = 0;
-    boolean globPattern = false;
-    List<String> pathStrings = new ArrayList<String>();
-
-    for (int i=0; i<length; i++) {
-      char ch = location.charAt(i);
-      switch(ch) {
-      case '{' : {
-        curlyOpen++;
-        if (!globPattern) {
-          globPattern = true;
-        }
-        break;
-      }
-      case '}' : {
-        curlyOpen--;
-        if (curlyOpen == 0 && globPattern) {
-          globPattern = false;
-        }
-        break;
-      }
-      case ',' : {
-        if (!globPattern) {
-          pathStrings.add(location.substring(pathStart, i));
-          pathStart = i + 1 ;
+    private void setInputPath(JobConf jobConf, String location)
+        throws IOException {
+
+        // ideally we should just call FileInputFormat.setInputPaths() here - but
+        // that won't work since FileInputFormat.setInputPaths() needs
+        // a Job object instead of a JobContext which we are handed here
+
+        int length = location.length();
+        int curlyOpen = 0;
+        int pathStart = 0;
+        boolean globPattern = false;
+        List<String> pathStrings = new ArrayList<String>();
+
+        for (int i = 0; i < length; i++) {
+            char ch = location.charAt(i);
+            switch (ch) {
+            case '{': {
+                curlyOpen++;
+                if (!globPattern) {
+                    globPattern = true;
+                }
+                break;
+            }
+            case '}': {
+                curlyOpen--;
+                if (curlyOpen == 0 && globPattern) {
+                    globPattern = false;
+                }
+                break;
+            }
+            case ',': {
+                if (!globPattern) {
+                    pathStrings.add(location.substring(pathStart, i));
+                    pathStart = i + 1;
+                }
+                break;
+            }
+            }
         }
-        break;
-      }
-      }
-    }
-    pathStrings.add(location.substring(pathStart, length));
+        pathStrings.add(location.substring(pathStart, length));
 
-    Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
+        Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
 
-    FileSystem fs = FileSystem.get(jobConf);
-    Path path = paths[0].makeQualified(fs);
-    StringBuilder str = new StringBuilder(StringUtils.escapeString(
-                                                          path.toString()));
-    for(int i = 1; i < paths.length;i++) {
-      str.append(StringUtils.COMMA_STR);
-      path = paths[i].makeQualified(fs);
-      str.append(StringUtils.escapeString(path.toString()));
-    }
+        FileSystem fs = FileSystem.get(jobConf);
+        Path path = paths[0].makeQualified(fs);
+        StringBuilder str = new StringBuilder(StringUtils.escapeString(
+            path.toString()));
+        for (int i = 1; i < paths.length; i++) {
+            str.append(StringUtils.COMMA_STR);
+            path = paths[i].makeQualified(fs);
+            str.append(StringUtils.escapeString(path.toString()));
+        }
 
-    jobConf.set("mapred.input.dir", str.toString());
-  }
+        jobConf.set("mapred.input.dir", str.toString());
+    }
 
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java Mon Sep 10 23:28:55 2012
@@ -39,99 +39,99 @@ public abstract class HCatBaseOutputForm
 
 //  static final private Log LOG = LogFactory.getLog(HCatBaseOutputFormat.class);
 
-  /**
-   * Gets the table schema for the table specified in the HCatOutputFormat.setOutput call
-   * on the specified job context.
-   * @param context the context
-   * @return the table schema
-   * @throws IOException if HCatOutputFromat.setOutput has not been called for the passed context
-   */
-  public static HCatSchema getTableSchema(JobContext context) throws IOException {
-      OutputJobInfo jobInfo = getJobInfo(context);
-      return jobInfo.getTableInfo().getDataColumns();
-  }
-
-  /**
-   * Check for validity of the output-specification for the job.
-   * @param context information about the job
-   * @throws IOException when output should not be attempted
-   */
-  @Override
-  public void checkOutputSpecs(JobContext context
-                                        ) throws IOException, InterruptedException {
-    getOutputFormat(context).checkOutputSpecs(context);
-  }
-
-  /**
-   * Gets the output format instance.
-   * @param context the job context
-   * @return the output format instance
-   * @throws IOException
-   */
-  protected OutputFormat<WritableComparable<?>, HCatRecord> getOutputFormat(JobContext context) throws IOException {
-      OutputJobInfo jobInfo = getJobInfo(context);
-      HCatStorageHandler  storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
-      //why do we need this?
-      configureOutputStorageHandler(context);
-      return storageHandler.getOutputFormatContainer(ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(),context.getConfiguration()));
-  }
-
-  /**
-   * Gets the HCatOuputJobInfo object by reading the Configuration and deserializing
-   * the string. If InputJobInfo is not present in the configuration, throws an
-   * exception since that means HCatOutputFormat.setOutput has not been called.
-   * @param jobContext the job context
-   * @return the OutputJobInfo object
-   * @throws IOException the IO exception
-   */
-  public static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException {
-      String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
-      if( jobString == null ) {
-          throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED);
-      }
-
-      return (OutputJobInfo) HCatUtil.deserialize(jobString);
-  }
-
-  /**
-   * Configure the output storage handler
-   * @param jobContext the job context
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  static void configureOutputStorageHandler(
-          JobContext jobContext) throws IOException {
-    configureOutputStorageHandler(jobContext,(List<String>)null);
-  }
-
-  /**
-   * Configure the output storage handler with allowing specification of missing dynamic partvals
-   * @param jobContext the job context
-   * @param dynamicPartVals
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  static void configureOutputStorageHandler(
-          JobContext jobContext, List<String> dynamicPartVals) throws IOException {
-      try {
-          OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-          HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(jobContext.getConfiguration(),jobInfo.getTableInfo().getStorerInfo());
-
-          Map<String, String> partitionValues = jobInfo.getPartitionValues();
-          String location = jobInfo.getLocation();
-
-          if (dynamicPartVals != null){
-            // dynamic part vals specified
-            List<String> dynamicPartKeys = jobInfo.getDynamicPartitioningKeys();
-            if (dynamicPartVals.size() != dynamicPartKeys.size()){
-              throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, 
-                  "Unable to configure dynamic partitioning for storage handler, mismatch between"
-                  + " number of partition values obtained["+dynamicPartVals.size()
-                  + "] and number of partition values required["+dynamicPartKeys.size()+"]");
-            }
-            for (int i = 0; i < dynamicPartKeys.size(); i++){
-              partitionValues.put(dynamicPartKeys.get(i), dynamicPartVals.get(i));
-            }
+    /**
+     * Gets the table schema for the table specified in the HCatOutputFormat.setOutput call
+     * on the specified job context.
+     * @param context the context
+     * @return the table schema
+     * @throws IOException if HCatOutputFromat.setOutput has not been called for the passed context
+     */
+    public static HCatSchema getTableSchema(JobContext context) throws IOException {
+        OutputJobInfo jobInfo = getJobInfo(context);
+        return jobInfo.getTableInfo().getDataColumns();
+    }
+
+    /**
+     * Check for validity of the output-specification for the job.
+     * @param context information about the job
+     * @throws IOException when output should not be attempted
+     */
+    @Override
+    public void checkOutputSpecs(JobContext context
+    ) throws IOException, InterruptedException {
+        getOutputFormat(context).checkOutputSpecs(context);
+    }
+
+    /**
+     * Gets the output format instance.
+     * @param context the job context
+     * @return the output format instance
+     * @throws IOException
+     */
+    protected OutputFormat<WritableComparable<?>, HCatRecord> getOutputFormat(JobContext context) throws IOException {
+        OutputJobInfo jobInfo = getJobInfo(context);
+        HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
+        //why do we need this?
+        configureOutputStorageHandler(context);
+        return storageHandler.getOutputFormatContainer(ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), context.getConfiguration()));
+    }
+
+    /**
+     * Gets the HCatOuputJobInfo object by reading the Configuration and deserializing
+     * the string. If InputJobInfo is not present in the configuration, throws an
+     * exception since that means HCatOutputFormat.setOutput has not been called.
+     * @param jobContext the job context
+     * @return the OutputJobInfo object
+     * @throws IOException the IO exception
+     */
+    public static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException {
+        String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+        if (jobString == null) {
+            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED);
+        }
+
+        return (OutputJobInfo) HCatUtil.deserialize(jobString);
+    }
+
+    /**
+     * Configure the output storage handler
+     * @param jobContext the job context
+     * @throws IOException
+     */
+    @SuppressWarnings("unchecked")
+    static void configureOutputStorageHandler(
+        JobContext jobContext) throws IOException {
+        configureOutputStorageHandler(jobContext, (List<String>) null);
+    }
+
+    /**
+     * Configure the output storage handler with allowing specification of missing dynamic partvals
+     * @param jobContext the job context
+     * @param dynamicPartVals
+     * @throws IOException
+     */
+    @SuppressWarnings("unchecked")
+    static void configureOutputStorageHandler(
+        JobContext jobContext, List<String> dynamicPartVals) throws IOException {
+        try {
+            OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+            HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(jobContext.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
+
+            Map<String, String> partitionValues = jobInfo.getPartitionValues();
+            String location = jobInfo.getLocation();
+
+            if (dynamicPartVals != null) {
+                // dynamic part vals specified
+                List<String> dynamicPartKeys = jobInfo.getDynamicPartitioningKeys();
+                if (dynamicPartVals.size() != dynamicPartKeys.size()) {
+                    throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
+                        "Unable to configure dynamic partitioning for storage handler, mismatch between"
+                            + " number of partition values obtained[" + dynamicPartVals.size()
+                            + "] and number of partition values required[" + dynamicPartKeys.size() + "]");
+                }
+                for (int i = 0; i < dynamicPartKeys.size(); i++) {
+                    partitionValues.put(dynamicPartKeys.get(i), dynamicPartVals.get(i));
+                }
 
 //            // re-home location, now that we know the rest of the partvals
 //            Table table = jobInfo.getTableInfo().getTable();
@@ -140,85 +140,85 @@ public abstract class HCatBaseOutputForm
 //            for(FieldSchema schema : table.getPartitionKeys()) {
 //              partitionCols.add(schema.getName());
 //            }
-            jobInfo.setPartitionValues(partitionValues);
-          }
+                jobInfo.setPartitionValues(partitionValues);
+            }
 
-          HCatUtil.configureOutputStorageHandler(storageHandler,jobContext,jobInfo);
-      } catch(Exception e) {
-        if (e instanceof HCatException){
-          throw (HCatException)e;
-        }else{
-          throw new HCatException(ErrorType.ERROR_INIT_STORAGE_HANDLER, e);
+            HCatUtil.configureOutputStorageHandler(storageHandler, jobContext, jobInfo);
+        } catch (Exception e) {
+            if (e instanceof HCatException) {
+                throw (HCatException) e;
+            } else {
+                throw new HCatException(ErrorType.ERROR_INIT_STORAGE_HANDLER, e);
+            }
         }
-      }
-  }
+    }
 
-  /**
-   * Configure the output storage handler, with allowing specification 
-   * of partvals from which it picks the dynamic partvals
-   * @param context the job context
-   * @param jobInfo the output job info
-   * @param fullPartSpec
-   * @throws IOException
-   */
-
-  protected static void configureOutputStorageHandler(
-      JobContext context, OutputJobInfo jobInfo,
-      Map<String, String> fullPartSpec) throws IOException {
-    List<String> dynamicPartKeys = jobInfo.getDynamicPartitioningKeys();
-    if ((dynamicPartKeys == null)||(dynamicPartKeys.isEmpty())){
-      configureOutputStorageHandler(context, (List<String>) null);
-    }else{
-      List<String> dynKeyVals = new ArrayList<String>();
-      for (String dynamicPartKey : dynamicPartKeys){
-        dynKeyVals.add(fullPartSpec.get(dynamicPartKey));
-      }
-      configureOutputStorageHandler(context, dynKeyVals);
-    }
-  }
-
-
-  protected static void setPartDetails(OutputJobInfo jobInfo, final HCatSchema schema,
-      Map<String, String> partMap) throws HCatException, IOException {
-    List<Integer> posOfPartCols = new ArrayList<Integer>();
-    List<Integer> posOfDynPartCols = new ArrayList<Integer>();
-
-    // If partition columns occur in data, we want to remove them.
-    // So, find out positions of partition columns in schema provided by user.
-    // We also need to update the output Schema with these deletions.
-    
-    // Note that, output storage handlers never sees partition columns in data
-    // or schema.
-
-    HCatSchema schemaWithoutParts = new HCatSchema(schema.getFields());
-    for(String partKey : partMap.keySet()){
-      Integer idx;
-      if((idx = schema.getPosition(partKey)) != null){
-        posOfPartCols.add(idx);
-        schemaWithoutParts.remove(schema.get(partKey));
-      }
-    }
-
-    // Also, if dynamic partitioning is being used, we want to
-    // set appropriate list of columns for the columns to be dynamically specified.
-    // These would be partition keys too, so would also need to be removed from 
-    // output schema and partcols
-
-    if (jobInfo.isDynamicPartitioningUsed()){
-      for (String partKey : jobInfo.getDynamicPartitioningKeys()){
-        Integer idx;
-        if((idx = schema.getPosition(partKey)) != null){
-          posOfPartCols.add(idx);
-          posOfDynPartCols.add(idx);
-          schemaWithoutParts.remove(schema.get(partKey));
+    /**
+     * Configure the output storage handler, with allowing specification
+     * of partvals from which it picks the dynamic partvals
+     * @param context the job context
+     * @param jobInfo the output job info
+     * @param fullPartSpec
+     * @throws IOException
+     */
+
+    protected static void configureOutputStorageHandler(
+        JobContext context, OutputJobInfo jobInfo,
+        Map<String, String> fullPartSpec) throws IOException {
+        List<String> dynamicPartKeys = jobInfo.getDynamicPartitioningKeys();
+        if ((dynamicPartKeys == null) || (dynamicPartKeys.isEmpty())) {
+            configureOutputStorageHandler(context, (List<String>) null);
+        } else {
+            List<String> dynKeyVals = new ArrayList<String>();
+            for (String dynamicPartKey : dynamicPartKeys) {
+                dynKeyVals.add(fullPartSpec.get(dynamicPartKey));
+            }
+            configureOutputStorageHandler(context, dynKeyVals);
         }
-      }
     }
-    
-    HCatUtil.validatePartitionSchema(
-        new Table(jobInfo.getTableInfo().getTable()), schemaWithoutParts);
-    jobInfo.setPosOfPartCols(posOfPartCols);
-    jobInfo.setPosOfDynPartCols(posOfDynPartCols);
-    jobInfo.setOutputSchema(schemaWithoutParts);
-  }
+
+
+    protected static void setPartDetails(OutputJobInfo jobInfo, final HCatSchema schema,
+                                         Map<String, String> partMap) throws HCatException, IOException {
+        List<Integer> posOfPartCols = new ArrayList<Integer>();
+        List<Integer> posOfDynPartCols = new ArrayList<Integer>();
+
+        // If partition columns occur in data, we want to remove them.
+        // So, find out positions of partition columns in schema provided by user.
+        // We also need to update the output Schema with these deletions.
+
+        // Note that, output storage handlers never sees partition columns in data
+        // or schema.
+
+        HCatSchema schemaWithoutParts = new HCatSchema(schema.getFields());
+        for (String partKey : partMap.keySet()) {
+            Integer idx;
+            if ((idx = schema.getPosition(partKey)) != null) {
+                posOfPartCols.add(idx);
+                schemaWithoutParts.remove(schema.get(partKey));
+            }
+        }
+
+        // Also, if dynamic partitioning is being used, we want to
+        // set appropriate list of columns for the columns to be dynamically specified.
+        // These would be partition keys too, so would also need to be removed from
+        // output schema and partcols
+
+        if (jobInfo.isDynamicPartitioningUsed()) {
+            for (String partKey : jobInfo.getDynamicPartitioningKeys()) {
+                Integer idx;
+                if ((idx = schema.getPosition(partKey)) != null) {
+                    posOfPartCols.add(idx);
+                    posOfDynPartCols.add(idx);
+                    schemaWithoutParts.remove(schema.get(partKey));
+                }
+            }
+        }
+
+        HCatUtil.validatePartitionSchema(
+            new Table(jobInfo.getTableInfo().getTable()), schemaWithoutParts);
+        jobInfo.setPosOfPartCols(posOfPartCols);
+        jobInfo.setPosOfDynPartCols(posOfDynPartCols);
+        jobInfo.setOutputSchema(schemaWithoutParts);
+    }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java Mon Sep 10 23:28:55 2012
@@ -25,23 +25,23 @@ import org.apache.hadoop.mapreduce.Job;
 /** The InputFormat to use to read data from HCatalog. */
 public class HCatInputFormat extends HCatBaseInputFormat {
 
-  /**
-   * Set the input information to use for the job. This queries the metadata server 
-   * with the specified partition predicates, gets the matching partitions, and 
-   * puts the information in the conf object. The inputInfo object is updated 
-   * with information needed in the client context.
-   * @param job the job object
-   * @param inputJobInfo the input information about the table to read
-   * @throws IOException the exception in communicating with the metadata server
-   */
-  public static void setInput(Job job,
-      InputJobInfo inputJobInfo) throws IOException {
-    try {
-      InitializeInput.setInput(job, inputJobInfo);
-    } catch (Exception e) {
-      throw new IOException(e);
+    /**
+     * Set the input information to use for the job. This queries the metadata server
+     * with the specified partition predicates, gets the matching partitions, and
+     * puts the information in the conf object. The inputInfo object is updated
+     * with information needed in the client context.
+     * @param job the job object
+     * @param inputJobInfo the input information about the table to read
+     * @throws IOException the exception in communicating with the metadata server
+     */
+    public static void setInput(Job job,
+                                InputJobInfo inputJobInfo) throws IOException {
+        try {
+            InitializeInput.setInput(job, inputJobInfo);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
     }
-  }
 
 
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Mon Sep 10 23:28:55 2012
@@ -66,135 +66,135 @@ public class HCatOutputFormat extends HC
      */
     @SuppressWarnings("unchecked")
     public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException {
-      HiveMetaStoreClient client = null;
+        HiveMetaStoreClient client = null;
 
-      try {
+        try {
 
-        Configuration conf = job.getConfiguration();
-        HiveConf hiveConf = HCatUtil.getHiveConf(conf);
-        client = HCatUtil.getHiveClient(hiveConf);
-        Table table = HCatUtil.getTable(client, outputJobInfo.getDatabaseName(),
-            outputJobInfo.getTableName());
-
-        List<String> indexList = client.listIndexNames(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName(), Short.MAX_VALUE);
-
-        for (String indexName : indexList) {
-            Index index = client.getIndex(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName(), indexName);
-            if (!index.isDeferredRebuild()) {
-                throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a table with an automatic index from Pig/Mapreduce is not supported");
+            Configuration conf = job.getConfiguration();
+            HiveConf hiveConf = HCatUtil.getHiveConf(conf);
+            client = HCatUtil.getHiveClient(hiveConf);
+            Table table = HCatUtil.getTable(client, outputJobInfo.getDatabaseName(),
+                outputJobInfo.getTableName());
+
+            List<String> indexList = client.listIndexNames(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName(), Short.MAX_VALUE);
+
+            for (String indexName : indexList) {
+                Index index = client.getIndex(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName(), indexName);
+                if (!index.isDeferredRebuild()) {
+                    throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a table with an automatic index from Pig/Mapreduce is not supported");
+                }
             }
-        }
-        StorageDescriptor sd = table.getTTable().getSd();
+            StorageDescriptor sd = table.getTTable().getSd();
 
-        if (sd.isCompressed()) {
-            throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a compressed partition from Pig/Mapreduce is not supported");
-        }
+            if (sd.isCompressed()) {
+                throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a compressed partition from Pig/Mapreduce is not supported");
+            }
 
-        if (sd.getBucketCols()!=null && !sd.getBucketCols().isEmpty()) {
-            throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a partition with bucket definition from Pig/Mapreduce is not supported");
-        }
+            if (sd.getBucketCols() != null && !sd.getBucketCols().isEmpty()) {
+                throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a partition with bucket definition from Pig/Mapreduce is not supported");
+            }
 
-        if (sd.getSortCols()!=null && !sd.getSortCols().isEmpty()) {
-            throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a partition with sorted column definition from Pig/Mapreduce is not supported");
-        }
+            if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
+                throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a partition with sorted column definition from Pig/Mapreduce is not supported");
+            }
 
-        if (table.getTTable().getPartitionKeysSize() == 0 ){
-          if ((outputJobInfo.getPartitionValues() != null) && (!outputJobInfo.getPartitionValues().isEmpty())){
-            // attempt made to save partition values in non-partitioned table - throw error.
-            throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
-                "Partition values specified for non-partitioned table");
-          }
-          // non-partitioned table
-          outputJobInfo.setPartitionValues(new HashMap<String, String>());
-
-        } else {
-          // partitioned table, we expect partition values
-          // convert user specified map to have lower case key names
-          Map<String, String> valueMap = new HashMap<String, String>();
-          if (outputJobInfo.getPartitionValues() != null){
-            for(Map.Entry<String, String> entry : outputJobInfo.getPartitionValues().entrySet()) {
-              valueMap.put(entry.getKey().toLowerCase(), entry.getValue());
-            }
-          }
-
-          if ((outputJobInfo.getPartitionValues() == null)
-              || (outputJobInfo.getPartitionValues().size() < table.getTTable().getPartitionKeysSize())){
-            // dynamic partition usecase - partition values were null, or not all were specified
-            // need to figure out which keys are not specified.
-            List<String> dynamicPartitioningKeys = new ArrayList<String>();
-            boolean firstItem = true;
-            for (FieldSchema fs : table.getPartitionKeys()){
-              if (!valueMap.containsKey(fs.getName().toLowerCase())){
-                dynamicPartitioningKeys.add(fs.getName().toLowerCase());
-              }
-            }
-
-            if (valueMap.size() + dynamicPartitioningKeys.size() != table.getTTable().getPartitionKeysSize()){
-              // If this isn't equal, then bogus key values have been inserted, error out.
-              throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified");
-            }
-
-            outputJobInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys);
-            String dynHash;
-            if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null){
-              dynHash = String.valueOf(Math.random());
+            if (table.getTTable().getPartitionKeysSize() == 0) {
+                if ((outputJobInfo.getPartitionValues() != null) && (!outputJobInfo.getPartitionValues().isEmpty())) {
+                    // attempt made to save partition values in non-partitioned table - throw error.
+                    throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
+                        "Partition values specified for non-partitioned table");
+                }
+                // non-partitioned table
+                outputJobInfo.setPartitionValues(new HashMap<String, String>());
+
+            } else {
+                // partitioned table, we expect partition values
+                // convert user specified map to have lower case key names
+                Map<String, String> valueMap = new HashMap<String, String>();
+                if (outputJobInfo.getPartitionValues() != null) {
+                    for (Map.Entry<String, String> entry : outputJobInfo.getPartitionValues().entrySet()) {
+                        valueMap.put(entry.getKey().toLowerCase(), entry.getValue());
+                    }
+                }
+
+                if ((outputJobInfo.getPartitionValues() == null)
+                    || (outputJobInfo.getPartitionValues().size() < table.getTTable().getPartitionKeysSize())) {
+                    // dynamic partition usecase - partition values were null, or not all were specified
+                    // need to figure out which keys are not specified.
+                    List<String> dynamicPartitioningKeys = new ArrayList<String>();
+                    boolean firstItem = true;
+                    for (FieldSchema fs : table.getPartitionKeys()) {
+                        if (!valueMap.containsKey(fs.getName().toLowerCase())) {
+                            dynamicPartitioningKeys.add(fs.getName().toLowerCase());
+                        }
+                    }
+
+                    if (valueMap.size() + dynamicPartitioningKeys.size() != table.getTTable().getPartitionKeysSize()) {
+                        // If this isn't equal, then bogus key values have been inserted, error out.
+                        throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, "Invalid partition keys specified");
+                    }
+
+                    outputJobInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys);
+                    String dynHash;
+                    if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null) {
+                        dynHash = String.valueOf(Math.random());
 //              LOG.info("New dynHash : ["+dynHash+"]");
 //            }else{
 //              LOG.info("Old dynHash : ["+dynHash+"]");
-            }
-            conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash);
+                    }
+                    conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash);
 
-          }
+                }
 
-          outputJobInfo.setPartitionValues(valueMap);
-        }
+                outputJobInfo.setPartitionValues(valueMap);
+            }
 
-        HCatSchema tableSchema = HCatUtil.extractSchema(table);
-        StorerInfo storerInfo =
-            InternalUtil.extractStorerInfo(table.getTTable().getSd(), table.getParameters());
-
-        List<String> partitionCols = new ArrayList<String>();
-        for(FieldSchema schema : table.getPartitionKeys()) {
-          partitionCols.add(schema.getName());
-        }
+            HCatSchema tableSchema = HCatUtil.extractSchema(table);
+            StorerInfo storerInfo =
+                InternalUtil.extractStorerInfo(table.getTTable().getSd(), table.getParameters());
+
+            List<String> partitionCols = new ArrayList<String>();
+            for (FieldSchema schema : table.getPartitionKeys()) {
+                partitionCols.add(schema.getName());
+            }
 
-       HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(job.getConfiguration(), storerInfo);
+            HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(job.getConfiguration(), storerInfo);
 
-        //Serialize the output info into the configuration
-        outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
-        outputJobInfo.setOutputSchema(tableSchema);
-        harRequested = getHarRequested(hiveConf);
-        outputJobInfo.setHarRequested(harRequested);
-        maxDynamicPartitions = getMaxDynamicPartitions(hiveConf);
-        outputJobInfo.setMaximumDynamicPartitions(maxDynamicPartitions);
-
-        HCatUtil.configureOutputStorageHandler(storageHandler,job,outputJobInfo);
-
-        Path tblPath = new Path(table.getTTable().getSd().getLocation());
-
-        /*  Set the umask in conf such that files/dirs get created with table-dir
-         * permissions. Following three assumptions are made:
-         * 1. Actual files/dirs creation is done by RecordWriter of underlying
-         * output format. It is assumed that they use default permissions while creation.
-         * 2. Default Permissions = FsPermission.getDefault() = 777.
-         * 3. UMask is honored by underlying filesystem.
-         */
+            //Serialize the output info into the configuration
+            outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
+            outputJobInfo.setOutputSchema(tableSchema);
+            harRequested = getHarRequested(hiveConf);
+            outputJobInfo.setHarRequested(harRequested);
+            maxDynamicPartitions = getMaxDynamicPartitions(hiveConf);
+            outputJobInfo.setMaximumDynamicPartitions(maxDynamicPartitions);
+
+            HCatUtil.configureOutputStorageHandler(storageHandler, job, outputJobInfo);
+
+            Path tblPath = new Path(table.getTTable().getSd().getLocation());
+
+            /*  Set the umask in conf such that files/dirs get created with table-dir
+            * permissions. Following three assumptions are made:
+            * 1. Actual files/dirs creation is done by RecordWriter of underlying
+            * output format. It is assumed that they use default permissions while creation.
+            * 2. Default Permissions = FsPermission.getDefault() = 777.
+            * 3. UMask is honored by underlying filesystem.
+            */
 
-        FsPermission.setUMask(conf, FsPermission.getDefault().applyUMask(
-            tblPath.getFileSystem(conf).getFileStatus(tblPath).getPermission()));
+            FsPermission.setUMask(conf, FsPermission.getDefault().applyUMask(
+                tblPath.getFileSystem(conf).getFileStatus(tblPath).getPermission()));
 
-        if(Security.getInstance().isSecurityEnabled()) {
-            Security.getInstance().handleSecurity(job, outputJobInfo, client, conf, harRequested);
-        }
-      } catch(Exception e) {
-        if( e instanceof HCatException ) {
-          throw (HCatException) e;
-        } else {
-          throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e);
+            if (Security.getInstance().isSecurityEnabled()) {
+                Security.getInstance().handleSecurity(job, outputJobInfo, client, conf, harRequested);
+            }
+        } catch (Exception e) {
+            if (e instanceof HCatException) {
+                throw (HCatException) e;
+            } else {
+                throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e);
+            }
+        } finally {
+            HCatUtil.closeHiveClientQuietly(client);
         }
-      } finally {
-        HCatUtil.closeHiveClientQuietly(client);
-      }
     }
 
     /**
@@ -207,7 +207,7 @@ public class HCatOutputFormat extends HC
     public static void setSchema(final Job job, final HCatSchema schema) throws IOException {
 
         OutputJobInfo jobInfo = getJobInfo(job);
-        Map<String,String> partMap = jobInfo.getPartitionValues();
+        Map<String, String> partMap = jobInfo.getPartitionValues();
         setPartDetails(jobInfo, schema, partMap);
         job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
     }
@@ -222,9 +222,9 @@ public class HCatOutputFormat extends HC
      */
     @Override
     public RecordWriter<WritableComparable<?>, HCatRecord>
-        getRecordWriter(TaskAttemptContext context)
+    getRecordWriter(TaskAttemptContext context)
         throws IOException, InterruptedException {
-      return getOutputFormat(context).getRecordWriter(context);
+        return getOutputFormat(context).getRecordWriter(context);
     }
 
 
@@ -238,25 +238,25 @@ public class HCatOutputFormat extends HC
      */
     @Override
     public OutputCommitter getOutputCommitter(TaskAttemptContext context
-                                       ) throws IOException, InterruptedException {
+    ) throws IOException, InterruptedException {
         return getOutputFormat(context).getOutputCommitter(context);
     }
 
     private static int getMaxDynamicPartitions(HiveConf hConf) {
-      // by default the bounds checking for maximum number of
-      // dynamic partitions is disabled (-1)
-      int maxDynamicPartitions = -1;
-
-      if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){
-        maxDynamicPartitions = hConf.getIntVar(
-                                HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
-      }
+        // by default the bounds checking for maximum number of
+        // dynamic partitions is disabled (-1)
+        int maxDynamicPartitions = -1;
+
+        if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED) {
+            maxDynamicPartitions = hConf.getIntVar(
+                HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
+        }
 
-      return maxDynamicPartitions;
+        return maxDynamicPartitions;
     }
 
     private static boolean getHarRequested(HiveConf hConf) {
-      return hConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED);
+        return hConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED);
     }
 
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java Mon Sep 10 23:28:55 2012
@@ -61,7 +61,7 @@ class HCatRecordReader extends RecordRea
 
     private Deserializer deserializer;
 
-    private Map<String,String> valuesNotInDataCols;
+    private Map<String, String> valuesNotInDataCols;
 
     private HCatSchema outputSchema = null;
     private HCatSchema dataSchema = null;
@@ -70,9 +70,9 @@ class HCatRecordReader extends RecordRea
      * Instantiates a new hcat record reader.
      */
     public HCatRecordReader(HCatStorageHandler storageHandler,
-                     Map<String,String> valuesNotInDataCols) {
-      this.storageHandler = storageHandler;
-      this.valuesNotInDataCols = valuesNotInDataCols;
+                            Map<String, String> valuesNotInDataCols) {
+        this.storageHandler = storageHandler;
+        this.valuesNotInDataCols = valuesNotInDataCols;
     }
 
     /* (non-Javadoc)
@@ -82,62 +82,62 @@ class HCatRecordReader extends RecordRea
      */
     @Override
     public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
-        TaskAttemptContext taskContext) throws IOException, InterruptedException {
+                           TaskAttemptContext taskContext) throws IOException, InterruptedException {
 
-      HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
+        HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
 
-      baseRecordReader = createBaseRecordReader(hcatSplit, storageHandler, taskContext);
-      createDeserializer(hcatSplit, storageHandler, taskContext);
+        baseRecordReader = createBaseRecordReader(hcatSplit, storageHandler, taskContext);
+        createDeserializer(hcatSplit, storageHandler, taskContext);
 
-      // Pull the output schema out of the TaskAttemptContext
-      outputSchema = (HCatSchema) HCatUtil.deserialize(
-          taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA));
+        // Pull the output schema out of the TaskAttemptContext
+        outputSchema = (HCatSchema) HCatUtil.deserialize(
+            taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA));
 
-      if (outputSchema == null) {
-        outputSchema = hcatSplit.getTableSchema();
-      }
+        if (outputSchema == null) {
+            outputSchema = hcatSplit.getTableSchema();
+        }
 
-      // Pull the table schema out of the Split info
-      // TODO This should be passed in the TaskAttemptContext instead
-      dataSchema = hcatSplit.getDataSchema();
+        // Pull the table schema out of the Split info
+        // TODO This should be passed in the TaskAttemptContext instead
+        dataSchema = hcatSplit.getDataSchema();
 
-      errorTracker = new InputErrorTracker(taskContext.getConfiguration());
+        errorTracker = new InputErrorTracker(taskContext.getConfiguration());
     }
 
     private org.apache.hadoop.mapred.RecordReader createBaseRecordReader(HCatSplit hcatSplit,
-        HCatStorageHandler storageHandler, TaskAttemptContext taskContext) throws IOException {
+                                                                         HCatStorageHandler storageHandler, TaskAttemptContext taskContext) throws IOException {
 
-      JobConf jobConf = HCatUtil.getJobConfFromContext(taskContext);
-      HCatUtil.copyJobPropertiesToJobConf(hcatSplit.getPartitionInfo().getJobProperties(), jobConf);
-      org.apache.hadoop.mapred.InputFormat inputFormat =
-          HCatInputFormat.getMapRedInputFormat(jobConf, storageHandler.getInputFormatClass());
-      return inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf,
-          InternalUtil.createReporter(taskContext));
+        JobConf jobConf = HCatUtil.getJobConfFromContext(taskContext);
+        HCatUtil.copyJobPropertiesToJobConf(hcatSplit.getPartitionInfo().getJobProperties(), jobConf);
+        org.apache.hadoop.mapred.InputFormat inputFormat =
+            HCatInputFormat.getMapRedInputFormat(jobConf, storageHandler.getInputFormatClass());
+        return inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf,
+            InternalUtil.createReporter(taskContext));
     }
 
     private void createDeserializer(HCatSplit hcatSplit, HCatStorageHandler storageHandler,
-        TaskAttemptContext taskContext) throws IOException {
+                                    TaskAttemptContext taskContext) throws IOException {
 
-      deserializer = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),
-          taskContext.getConfiguration());
+        deserializer = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),
+            taskContext.getConfiguration());
 
-      try {
-        InternalUtil.initializeDeserializer(deserializer, storageHandler.getConf(),
-            hcatSplit.getPartitionInfo().getTableInfo(),
-            hcatSplit.getPartitionInfo().getPartitionSchema());
-      } catch (SerDeException e) {
-        throw new IOException("Failed initializing deserializer "
-            + storageHandler.getSerDeClass().getName(), e);
-      }
+        try {
+            InternalUtil.initializeDeserializer(deserializer, storageHandler.getConf(),
+                hcatSplit.getPartitionInfo().getTableInfo(),
+                hcatSplit.getPartitionInfo().getPartitionSchema());
+        } catch (SerDeException e) {
+            throw new IOException("Failed initializing deserializer "
+                + storageHandler.getSerDeClass().getName(), e);
+        }
     }
 
-  /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
-     */
+    /* (non-Javadoc)
+    * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
+    */
     @Override
     public WritableComparable getCurrentKey()
-    throws IOException, InterruptedException {
-      return currentKey;
+        throws IOException, InterruptedException {
+        return currentKey;
     }
 
     /* (non-Javadoc)
@@ -145,140 +145,140 @@ class HCatRecordReader extends RecordRea
      */
     @Override
     public HCatRecord getCurrentValue() throws IOException, InterruptedException {
-      return currentHCatRecord;
+        return currentHCatRecord;
     }
 
     /* (non-Javadoc)
      * @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
      */
     @Override
-    public float getProgress()  {
+    public float getProgress() {
         try {
-          return baseRecordReader.getProgress();
+            return baseRecordReader.getProgress();
         } catch (IOException e) {
-            LOG.warn("Exception in HCatRecord reader",e);
+            LOG.warn("Exception in HCatRecord reader", e);
         }
         return 0.0f; // errored
     }
 
-  /**
-   * Check if the wrapped RecordReader has another record, and if so convert it into an
-   * HCatRecord. We both check for records and convert here so a configurable percent of
-   * bad records can be tolerated.
-   *
-   * @return if there is a next record
-   * @throws IOException on error
-   * @throws InterruptedException on error
-   */
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (currentKey == null) {
-      currentKey = baseRecordReader.createKey();
-      currentValue = baseRecordReader.createValue();
-    }
-
-    while (baseRecordReader.next(currentKey, currentValue)) {
-      HCatRecord r = null;
-      Throwable t = null;
-
-      errorTracker.incRecords();
-
-      try {
-        Object o = deserializer.deserialize(currentValue);
-        r = new LazyHCatRecord(o, deserializer.getObjectInspector());
-      } catch (Throwable throwable) {
-        t = throwable;
-      }
-
-      if (r == null) {
-        errorTracker.incErrors(t);
-        continue;
-      }
-
-      DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size());
-      int i = 0;
-      for (String fieldName : outputSchema.getFieldNames()) {
-        if (dataSchema.getPosition(fieldName) != null) {
-          dr.set(i, r.get(fieldName, dataSchema));
-        } else {
-          dr.set(i, valuesNotInDataCols.get(fieldName));
-        }
-        i++;
-      }
-
-      currentHCatRecord = dr;
-      return true;
-    }
+    /**
+     * Check if the wrapped RecordReader has another record, and if so convert it into an
+     * HCatRecord. We both check for records and convert here so a configurable percent of
+     * bad records can be tolerated.
+     *
+     * @return if there is a next record
+     * @throws IOException on error
+     * @throws InterruptedException on error
+     */
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+        if (currentKey == null) {
+            currentKey = baseRecordReader.createKey();
+            currentValue = baseRecordReader.createValue();
+        }
 
-    return false;
-  }
+        while (baseRecordReader.next(currentKey, currentValue)) {
+            HCatRecord r = null;
+            Throwable t = null;
+
+            errorTracker.incRecords();
+
+            try {
+                Object o = deserializer.deserialize(currentValue);
+                r = new LazyHCatRecord(o, deserializer.getObjectInspector());
+            } catch (Throwable throwable) {
+                t = throwable;
+            }
+
+            if (r == null) {
+                errorTracker.incErrors(t);
+                continue;
+            }
+
+            DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size());
+            int i = 0;
+            for (String fieldName : outputSchema.getFieldNames()) {
+                if (dataSchema.getPosition(fieldName) != null) {
+                    dr.set(i, r.get(fieldName, dataSchema));
+                } else {
+                    dr.set(i, valuesNotInDataCols.get(fieldName));
+                }
+                i++;
+            }
 
-  /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.RecordReader#close()
-     */
+            currentHCatRecord = dr;
+            return true;
+        }
+
+        return false;
+    }
+
+    /* (non-Javadoc)
+    * @see org.apache.hadoop.mapreduce.RecordReader#close()
+    */
     @Override
     public void close() throws IOException {
         baseRecordReader.close();
     }
 
-  /**
-   * Tracks number of of errors in input and throws a Runtime exception
-   * if the rate of errors crosses a limit.
-   * <br/>
-   * The intention is to skip over very rare file corruption or incorrect
-   * input, but catch programmer errors (incorrect format, or incorrect
-   * deserializers etc).
-   *
-   * This class was largely copied from Elephant-Bird (thanks @rangadi!)
-   * https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java
-   */
-  static class InputErrorTracker {
-    long numRecords;
-    long numErrors;
-
-    double errorThreshold; // max fraction of errors allowed
-    long minErrors; // throw error only after this many errors
-
-    InputErrorTracker(Configuration conf) {
-      errorThreshold = conf.getFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY,
-          HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT);
-      minErrors = conf.getLong(HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_KEY,
-          HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_DEFAULT);
-      numRecords = 0;
-      numErrors = 0;
-    }
-
-    void incRecords() {
-      numRecords++;
-    }
-
-    void incErrors(Throwable cause) {
-      numErrors++;
-      if (numErrors > numRecords) {
-        // incorrect use of this class
-        throw new RuntimeException("Forgot to invoke incRecords()?");
-      }
-
-      if (cause == null) {
-        cause = new Exception("Unknown error");
-      }
-
-      if (errorThreshold <= 0) { // no errors are tolerated
-        throw new RuntimeException("error while reading input records", cause);
-      }
-
-      LOG.warn("Error while reading an input record ("
-          + numErrors + " out of " + numRecords + " so far ): ", cause);
-
-      double errRate = numErrors / (double) numRecords;
-
-      // will always excuse the first error. We can decide if single
-      // error crosses threshold inside close() if we want to.
-      if (numErrors >= minErrors && errRate > errorThreshold) {
-        LOG.error(numErrors + " out of " + numRecords
-            + " crosses configured threshold (" + errorThreshold + ")");
-        throw new RuntimeException("error rate while reading input records crossed threshold", cause);
-      }
+    /**
+     * Tracks number of of errors in input and throws a Runtime exception
+     * if the rate of errors crosses a limit.
+     * <br/>
+     * The intention is to skip over very rare file corruption or incorrect
+     * input, but catch programmer errors (incorrect format, or incorrect
+     * deserializers etc).
+     *
+     * This class was largely copied from Elephant-Bird (thanks @rangadi!)
+     * https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java
+     */
+    static class InputErrorTracker {
+        long numRecords;
+        long numErrors;
+
+        double errorThreshold; // max fraction of errors allowed
+        long minErrors; // throw error only after this many errors
+
+        InputErrorTracker(Configuration conf) {
+            errorThreshold = conf.getFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY,
+                HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT);
+            minErrors = conf.getLong(HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_KEY,
+                HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_DEFAULT);
+            numRecords = 0;
+            numErrors = 0;
+        }
+
+        void incRecords() {
+            numRecords++;
+        }
+
+        void incErrors(Throwable cause) {
+            numErrors++;
+            if (numErrors > numRecords) {
+                // incorrect use of this class
+                throw new RuntimeException("Forgot to invoke incRecords()?");
+            }
+
+            if (cause == null) {
+                cause = new Exception("Unknown error");
+            }
+
+            if (errorThreshold <= 0) { // no errors are tolerated
+                throw new RuntimeException("error while reading input records", cause);
+            }
+
+            LOG.warn("Error while reading an input record ("
+                + numErrors + " out of " + numRecords + " so far ): ", cause);
+
+            double errRate = numErrors / (double) numRecords;
+
+            // will always excuse the first error. We can decide if single
+            // error crosses threshold inside close() if we want to.
+            if (numErrors >= minErrors && errRate > errorThreshold) {
+                LOG.error(numErrors + " out of " + numRecords
+                    + " crosses configured threshold (" + errorThreshold + ")");
+                throw new RuntimeException("error rate while reading input records crossed threshold", cause);
+            }
+        }
     }
-  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java Mon Sep 10 23:28:55 2012
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
 
 /** The HCatSplit wrapper around the InputSplit returned by the underlying InputFormat */
 public class HCatSplit extends InputSplit
-  implements Writable,org.apache.hadoop.mapred.InputSplit {
+    implements Writable, org.apache.hadoop.mapred.InputSplit {
 
     private static final Logger LOG = LoggerFactory.getLogger(HCatSplit.class);
     /** The partition info for the split. */
@@ -61,13 +61,13 @@ public class HCatSplit extends InputSpli
      * @param tableSchema the table level schema
      */
     public HCatSplit(PartInfo partitionInfo,
-        org.apache.hadoop.mapred.InputSplit baseMapRedSplit,
-        HCatSchema tableSchema) {
+                     org.apache.hadoop.mapred.InputSplit baseMapRedSplit,
+                     HCatSchema tableSchema) {
 
-      this.partitionInfo = partitionInfo;
-      // dataSchema can be obtained from partitionInfo.getPartitionSchema()
-      this.baseMapRedSplit = baseMapRedSplit;
-      this.tableSchema = tableSchema;
+        this.partitionInfo = partitionInfo;
+        // dataSchema can be obtained from partitionInfo.getPartitionSchema()
+        this.baseMapRedSplit = baseMapRedSplit;
+        this.tableSchema = tableSchema;
     }
 
     /**
@@ -99,7 +99,7 @@ public class HCatSplit extends InputSpli
      * @return the table schema
      */
     public HCatSchema getTableSchema() {
-      return this.tableSchema;
+        return this.tableSchema;
     }
 
     /* (non-Javadoc)
@@ -108,9 +108,9 @@ public class HCatSplit extends InputSpli
     @Override
     public long getLength() {
         try {
-          return baseMapRedSplit.getLength();
+            return baseMapRedSplit.getLength();
         } catch (IOException e) {
-          LOG.warn("Exception in HCatSplit",e);
+            LOG.warn("Exception in HCatSplit", e);
         }
         return 0; // we errored
     }
@@ -121,9 +121,9 @@ public class HCatSplit extends InputSpli
     @Override
     public String[] getLocations() {
         try {
-          return baseMapRedSplit.getLocations();
+            return baseMapRedSplit.getLocations();
         } catch (IOException e) {
-            LOG.warn("Exception in HCatSplit",e);
+            LOG.warn("Exception in HCatSplit", e);
         }
         return new String[0]; // we errored
     }
@@ -139,23 +139,23 @@ public class HCatSplit extends InputSpli
 
         String baseSplitClassName = WritableUtils.readString(input);
         org.apache.hadoop.mapred.InputSplit split;
-        try{
+        try {
             Class<? extends org.apache.hadoop.mapred.InputSplit> splitClass =
                 (Class<? extends org.apache.hadoop.mapred.InputSplit>) Class.forName(baseSplitClassName);
 
             //Class.forName().newInstance() does not work if the underlying
             //InputSplit has package visibility
             Constructor<? extends org.apache.hadoop.mapred.InputSplit>
-              constructor =
+                constructor =
                 splitClass.getDeclaredConstructor(new Class[]{});
             constructor.setAccessible(true);
 
             split = constructor.newInstance();
             // read baseSplit from input
-            ((Writable)split).readFields(input);
+            ((Writable) split).readFields(input);
             this.baseMapRedSplit = split;
-        }catch(Exception e){
-            throw new IOException ("Exception from " + baseSplitClassName, e);
+        } catch (Exception e) {
+            throw new IOException("Exception from " + baseSplitClassName, e);
         }
 
         String tableSchemaString = WritableUtils.readString(input);
@@ -173,7 +173,7 @@ public class HCatSplit extends InputSpli
         WritableUtils.writeString(output, partitionInfoString);
 
         WritableUtils.writeString(output, baseMapRedSplit.getClass().getName());
-        Writable baseSplitWritable = (Writable)baseMapRedSplit;
+        Writable baseSplitWritable = (Writable) baseMapRedSplit;
         //write  baseSplit into output
         baseSplitWritable.write(output);
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java Mon Sep 10 23:28:55 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.OutputFo
 public abstract class HCatStorageHandler implements HiveStorageHandler {
 
     //TODO move this to HiveStorageHandler
+
     /**
      * This method is called to allow the StorageHandlers the chance
      * to populate the JobContext.getConfiguration() with properties that
@@ -53,6 +54,7 @@ public abstract class HCatStorageHandler
     public abstract void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties);
 
     //TODO move this to HiveStorageHandler
+
     /**
      * This method is called to allow the StorageHandlers the chance
      * to populate the JobContext.getConfiguration() with properties that
@@ -72,46 +74,46 @@ public abstract class HCatStorageHandler
     public abstract void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties);
 
     /**
-     * 
-     * 
+     *
+     *
      * @return authorization provider
      * @throws HiveException
      */
     public abstract HiveAuthorizationProvider getAuthorizationProvider()
-            throws HiveException;
-    
+        throws HiveException;
+
     /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.hadoop.hive.ql.metadata.HiveStorageHandler#
-     * configureTableJobProperties(org.apache.hadoop.hive.ql.plan.TableDesc,
-     * java.util.Map)
-     */
+    * (non-Javadoc)
+    *
+    * @see org.apache.hadoop.hive.ql.metadata.HiveStorageHandler#
+    * configureTableJobProperties(org.apache.hadoop.hive.ql.plan.TableDesc,
+    * java.util.Map)
+    */
     @Override
     @Deprecated
     public final void configureTableJobProperties(TableDesc tableDesc,
-            Map<String, String> jobProperties) {
+                                                  Map<String, String> jobProperties) {
     }
-    
+
     /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.hadoop.conf.Configurable#getConf()
-     */
+    * (non-Javadoc)
+    *
+    * @see org.apache.hadoop.conf.Configurable#getConf()
+    */
     @Override
     public abstract Configuration getConf();
-    
+
     /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.
-     * Configuration)
-     */
+    * (non-Javadoc)
+    *
+    * @see org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.
+    * Configuration)
+    */
     @Override
     public abstract void setConf(Configuration conf);
 
     OutputFormatContainer getOutputFormatContainer(OutputFormat outputFormat) {
         return new DefaultOutputFormatContainer(outputFormat);
     }
-    
+
 }