You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by tr...@apache.org on 2012/09/10 23:29:03 UTC

svn commit: r1383152 [12/27] - in /incubator/hcatalog/trunk: ./ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/ ...

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java Mon Sep 10 23:28:55 2012
@@ -35,150 +35,152 @@ import org.apache.hcatalog.data.schema.H
 public class HCatTableInfo implements Serializable {
 
 
-  private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-  /** The db and table names */
-  private final String databaseName;
-  private final String tableName;
-
-  /** The table schema. */
-  private final HCatSchema dataColumns;
-  private final HCatSchema partitionColumns;
-
-  /** The table being written to */
-  private final Table table;
-
-  /** The storer info */
-  private StorerInfo storerInfo;
-
-  /**
-   * Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat}
-   * for reading data from a table.
-   * work with hadoop security, the kerberos principal name of the server - else null
-   * The principal name should be of the form:
-   * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
-   * The special string _HOST will be replaced automatically with the correct host name
-   * @param databaseName the db name
-   * @param tableName the table name
-   * @param dataColumns schema of columns which contain data
-   * @param partitionColumns schema of partition columns
-   * @param storerInfo information about storage descriptor
-   * @param table hive metastore table class
-   */
-  HCatTableInfo(
-      String databaseName,
-      String tableName,
-      HCatSchema dataColumns,
-      HCatSchema partitionColumns,
-      StorerInfo storerInfo,
-      Table table) {
-    this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
-    this.tableName = tableName;
-    this.dataColumns = dataColumns;
-    this.table = table;
-    this.storerInfo = storerInfo;
-    this.partitionColumns = partitionColumns;
-  }
-
-  /**
-   * Gets the value of databaseName
-   * @return the databaseName
-   */
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-  /**
-   * Gets the value of tableName
-   * @return the tableName
-   */
-  public String getTableName() {
-    return tableName;
-  }
-
-  /**
-   * @return return schema of data columns as defined in meta store
-   */
-  public HCatSchema getDataColumns() {
-    return dataColumns;
-  }
-
-  /**
-   * @return schema of partition columns
-   */
-  public HCatSchema getPartitionColumns() {
-    return partitionColumns;
-  }
-
-  /**
-   * @return the storerInfo
-   */
-  public StorerInfo getStorerInfo() {
-    return storerInfo;
-  }
-
-  public String getTableLocation() {
-      return table.getSd().getLocation();
-  }
-
-  /**
-   * minimize dependency on hive classes so this is package private
-   * this should eventually no longer be used
-   * @return hive metastore representation of table
-   */
-  Table getTable() {
-    return table;
-  }
-
-  /**
-   * create an HCatTableInfo instance from the supplied Hive Table instance
-   * @param table to create an instance from
-   * @return HCatTableInfo
-   * @throws IOException
-   */
-  static HCatTableInfo valueOf(Table table) throws IOException {
-    // Explicitly use {@link org.apache.hadoop.hive.ql.metadata.Table} when getting the schema,
-    // but store @{link org.apache.hadoop.hive.metastore.api.Table} as this class is serialized
-    // into the job conf.
-    org.apache.hadoop.hive.ql.metadata.Table mTable =
-        new org.apache.hadoop.hive.ql.metadata.Table(table);
-    HCatSchema schema = HCatUtil.extractSchema(mTable);
-    StorerInfo storerInfo =
-        InternalUtil.extractStorerInfo(table.getSd(), table.getParameters());
-    HCatSchema partitionColumns = HCatUtil.getPartitionColumns(mTable);
-    return new HCatTableInfo(table.getDbName(), table.getTableName(), schema,
-        partitionColumns, storerInfo, table);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    HCatTableInfo tableInfo = (HCatTableInfo) o;
-
-    if (dataColumns != null ? !dataColumns.equals(tableInfo.dataColumns) : tableInfo.dataColumns != null) return false;
-    if (databaseName != null ? !databaseName.equals(tableInfo.databaseName) : tableInfo.databaseName != null) return false;
-    if (partitionColumns != null ? !partitionColumns.equals(tableInfo.partitionColumns) : tableInfo.partitionColumns != null)
-      return false;
-    if (storerInfo != null ? !storerInfo.equals(tableInfo.storerInfo) : tableInfo.storerInfo != null) return false;
-    if (table != null ? !table.equals(tableInfo.table) : tableInfo.table != null) return false;
-    if (tableName != null ? !tableName.equals(tableInfo.tableName) : tableInfo.tableName != null) return false;
-
-    return true;
-  }
-
-
-  @Override
-  public int hashCode() {
-    int result = databaseName != null ? databaseName.hashCode() : 0;
-    result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
-    result = 31 * result + (dataColumns != null ? dataColumns.hashCode() : 0);
-    result = 31 * result + (partitionColumns != null ? partitionColumns.hashCode() : 0);
-    result = 31 * result + (table != null ? table.hashCode() : 0);
-    result = 31 * result + (storerInfo != null ? storerInfo.hashCode() : 0);
-    return result;
-  }
+    /** The db and table names */
+    private final String databaseName;
+    private final String tableName;
+
+    /** The table schema. */
+    private final HCatSchema dataColumns;
+    private final HCatSchema partitionColumns;
+
+    /** The table being written to */
+    private final Table table;
+
+    /** The storer info */
+    private StorerInfo storerInfo;
+
+    /**
+     * Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat}
+     * for reading data from a table.
+     * work with hadoop security, the kerberos principal name of the server - else null
+     * The principal name should be of the form:
+     * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
+     * The special string _HOST will be replaced automatically with the correct host name
+     * @param databaseName the db name
+     * @param tableName the table name
+     * @param dataColumns schema of columns which contain data
+     * @param partitionColumns schema of partition columns
+     * @param storerInfo information about storage descriptor
+     * @param table hive metastore table class
+     */
+    HCatTableInfo(
+        String databaseName,
+        String tableName,
+        HCatSchema dataColumns,
+        HCatSchema partitionColumns,
+        StorerInfo storerInfo,
+        Table table) {
+        this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
+        this.tableName = tableName;
+        this.dataColumns = dataColumns;
+        this.table = table;
+        this.storerInfo = storerInfo;
+        this.partitionColumns = partitionColumns;
+    }
+
+    /**
+     * Gets the value of databaseName
+     * @return the databaseName
+     */
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    /**
+     * Gets the value of tableName
+     * @return the tableName
+     */
+    public String getTableName() {
+        return tableName;
+    }
+
+    /**
+     * @return return schema of data columns as defined in meta store
+     */
+    public HCatSchema getDataColumns() {
+        return dataColumns;
+    }
+
+    /**
+     * @return schema of partition columns
+     */
+    public HCatSchema getPartitionColumns() {
+        return partitionColumns;
+    }
+
+    /**
+     * @return the storerInfo
+     */
+    public StorerInfo getStorerInfo() {
+        return storerInfo;
+    }
+
+    public String getTableLocation() {
+        return table.getSd().getLocation();
+    }
+
+    /**
+     * minimize dependency on hive classes so this is package private
+     * this should eventually no longer be used
+     * @return hive metastore representation of table
+     */
+    Table getTable() {
+        return table;
+    }
+
+    /**
+     * create an HCatTableInfo instance from the supplied Hive Table instance
+     * @param table to create an instance from
+     * @return HCatTableInfo
+     * @throws IOException
+     */
+    static HCatTableInfo valueOf(Table table) throws IOException {
+        // Explicitly use {@link org.apache.hadoop.hive.ql.metadata.Table} when getting the schema,
+        // but store @{link org.apache.hadoop.hive.metastore.api.Table} as this class is serialized
+        // into the job conf.
+        org.apache.hadoop.hive.ql.metadata.Table mTable =
+            new org.apache.hadoop.hive.ql.metadata.Table(table);
+        HCatSchema schema = HCatUtil.extractSchema(mTable);
+        StorerInfo storerInfo =
+            InternalUtil.extractStorerInfo(table.getSd(), table.getParameters());
+        HCatSchema partitionColumns = HCatUtil.getPartitionColumns(mTable);
+        return new HCatTableInfo(table.getDbName(), table.getTableName(), schema,
+            partitionColumns, storerInfo, table);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        HCatTableInfo tableInfo = (HCatTableInfo) o;
+
+        if (dataColumns != null ? !dataColumns.equals(tableInfo.dataColumns) : tableInfo.dataColumns != null)
+            return false;
+        if (databaseName != null ? !databaseName.equals(tableInfo.databaseName) : tableInfo.databaseName != null)
+            return false;
+        if (partitionColumns != null ? !partitionColumns.equals(tableInfo.partitionColumns) : tableInfo.partitionColumns != null)
+            return false;
+        if (storerInfo != null ? !storerInfo.equals(tableInfo.storerInfo) : tableInfo.storerInfo != null) return false;
+        if (table != null ? !table.equals(tableInfo.table) : tableInfo.table != null) return false;
+        if (tableName != null ? !tableName.equals(tableInfo.tableName) : tableInfo.tableName != null) return false;
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = databaseName != null ? databaseName.hashCode() : 0;
+        result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
+        result = 31 * result + (dataColumns != null ? dataColumns.hashCode() : 0);
+        result = 31 * result + (partitionColumns != null ? partitionColumns.hashCode() : 0);
+        result = 31 * result + (table != null ? table.hashCode() : 0);
+        result = 31 * result + (storerInfo != null ? storerInfo.hashCode() : 0);
+        return result;
+    }
 
 }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java Mon Sep 10 23:28:55 2012
@@ -50,136 +50,136 @@ public class InitializeInput {
 
     private static final Logger LOG = LoggerFactory.getLogger(InitializeInput.class);
 
-  /**
-   * Set the input to use for the Job. This queries the metadata server with the specified
-   * partition predicates, gets the matching partitions, and puts the information in the job
-   * configuration object.
-   *
-   * To ensure a known InputJobInfo state, only the database name, table name, filter, and
-   * properties are preserved. All other modification from the given InputJobInfo are discarded.
-   *
-   * After calling setInput, InputJobInfo can be retrieved from the job configuration as follows:
-   * {code}
-   * InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(
-   *     job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
-   * {code}
-   *
-   * @param job the job object
-   * @param theirInputJobInfo information on the Input to read
-   * @throws Exception
-   */
-  public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exception {
-    InputJobInfo inputJobInfo = InputJobInfo.create(
-        theirInputJobInfo.getDatabaseName(),
-        theirInputJobInfo.getTableName(),
-        theirInputJobInfo.getFilter());
-    inputJobInfo.getProperties().putAll(theirInputJobInfo.getProperties());
-    job.getConfiguration().set(
-        HCatConstants.HCAT_KEY_JOB_INFO,
-        HCatUtil.serialize(getInputJobInfo(job, inputJobInfo, null)));
-  }
-
-  /**
-   * Returns the given InputJobInfo after populating with data queried from the metadata service.
-   */
-  private static InputJobInfo getInputJobInfo(
-      Job job, InputJobInfo inputJobInfo, String locationFilter) throws Exception {
-
-    HiveMetaStoreClient client = null;
-    HiveConf hiveConf = null;
-    try {
-      if (job != null){
-        hiveConf = HCatUtil.getHiveConf(job.getConfiguration());
-      } else {
-        hiveConf = new HiveConf(HCatInputFormat.class);
-      }
-      client = HCatUtil.getHiveClient(hiveConf);
-      Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
-          inputJobInfo.getTableName());
-
-      List<PartInfo> partInfoList = new ArrayList<PartInfo>();
-
-      inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
-      if( table.getPartitionKeys().size() != 0 ) {
-        //Partitioned table
-        List<Partition> parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(),
-                                                              inputJobInfo.getTableName(),
-                                                              inputJobInfo.getFilter(),
-                                                              (short) -1);
-
-        // Default to 100,000 partitions if hive.metastore.maxpartition is not defined
-        int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
-        if (parts != null && parts.size() > maxPart) {
-          throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, "total number of partitions is " + parts.size());
-        }
+    /**
+     * Set the input to use for the Job. This queries the metadata server with the specified
+     * partition predicates, gets the matching partitions, and puts the information in the job
+     * configuration object.
+     *
+     * To ensure a known InputJobInfo state, only the database name, table name, filter, and
+     * properties are preserved. All other modification from the given InputJobInfo are discarded.
+     *
+     * After calling setInput, InputJobInfo can be retrieved from the job configuration as follows:
+     * {code}
+     * InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(
+     *     job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
+     * {code}
+     *
+     * @param job the job object
+     * @param theirInputJobInfo information on the Input to read
+     * @throws Exception
+     */
+    public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exception {
+        InputJobInfo inputJobInfo = InputJobInfo.create(
+            theirInputJobInfo.getDatabaseName(),
+            theirInputJobInfo.getTableName(),
+            theirInputJobInfo.getFilter());
+        inputJobInfo.getProperties().putAll(theirInputJobInfo.getProperties());
+        job.getConfiguration().set(
+            HCatConstants.HCAT_KEY_JOB_INFO,
+            HCatUtil.serialize(getInputJobInfo(job, inputJobInfo, null)));
+    }
 
-        // populate partition info
-        for (Partition ptn : parts){
-          HCatSchema schema = HCatUtil.extractSchema(
-              new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
-          PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
-              ptn.getParameters(), job.getConfiguration(), inputJobInfo);
-          partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn));
-          partInfoList.add(partInfo);
+    /**
+     * Returns the given InputJobInfo after populating with data queried from the metadata service.
+     */
+    private static InputJobInfo getInputJobInfo(
+        Job job, InputJobInfo inputJobInfo, String locationFilter) throws Exception {
+
+        HiveMetaStoreClient client = null;
+        HiveConf hiveConf = null;
+        try {
+            if (job != null) {
+                hiveConf = HCatUtil.getHiveConf(job.getConfiguration());
+            } else {
+                hiveConf = new HiveConf(HCatInputFormat.class);
+            }
+            client = HCatUtil.getHiveClient(hiveConf);
+            Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
+                inputJobInfo.getTableName());
+
+            List<PartInfo> partInfoList = new ArrayList<PartInfo>();
+
+            inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
+            if (table.getPartitionKeys().size() != 0) {
+                //Partitioned table
+                List<Partition> parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(),
+                    inputJobInfo.getTableName(),
+                    inputJobInfo.getFilter(),
+                    (short) -1);
+
+                // Default to 100,000 partitions if hive.metastore.maxpartition is not defined
+                int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
+                if (parts != null && parts.size() > maxPart) {
+                    throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, "total number of partitions is " + parts.size());
+                }
+
+                // populate partition info
+                for (Partition ptn : parts) {
+                    HCatSchema schema = HCatUtil.extractSchema(
+                        new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
+                    PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
+                        ptn.getParameters(), job.getConfiguration(), inputJobInfo);
+                    partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn));
+                    partInfoList.add(partInfo);
+                }
+
+            } else {
+                //Non partitioned table
+                HCatSchema schema = HCatUtil.extractSchema(table);
+                PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
+                    table.getParameters(), job.getConfiguration(), inputJobInfo);
+                partInfo.setPartitionValues(new HashMap<String, String>());
+                partInfoList.add(partInfo);
+            }
+            inputJobInfo.setPartitions(partInfoList);
+
+            return inputJobInfo;
+        } finally {
+            HCatUtil.closeHiveClientQuietly(client);
         }
 
-      }else{
-        //Non partitioned table
-        HCatSchema schema = HCatUtil.extractSchema(table);
-        PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
-            table.getParameters(), job.getConfiguration(), inputJobInfo);
-        partInfo.setPartitionValues(new HashMap<String,String>());
-        partInfoList.add(partInfo);
-      }
-      inputJobInfo.setPartitions(partInfoList);
-
-      return inputJobInfo;
-    } finally {
-      HCatUtil.closeHiveClientQuietly(client);
     }
 
-  }
+    private static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn) throws IOException {
+        List<String> values = ptn.getValues();
+        if (values.size() != table.getPartitionKeys().size()) {
+            throw new IOException("Partition values in partition inconsistent with table definition, table "
+                + table.getTableName() + " has "
+                + table.getPartitionKeys().size()
+                + " partition keys, partition has " + values.size() + "partition values");
+        }
 
-  private static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn) throws IOException{
-    List<String> values = ptn.getValues();
-    if( values.size() != table.getPartitionKeys().size() ) {
-      throw new IOException("Partition values in partition inconsistent with table definition, table "
-          + table.getTableName() + " has "
-          + table.getPartitionKeys().size()
-          + " partition keys, partition has " + values.size() + "partition values" );
-    }
+        Map<String, String> ptnKeyValues = new HashMap<String, String>();
 
-    Map<String,String> ptnKeyValues = new HashMap<String,String>();
+        int i = 0;
+        for (FieldSchema schema : table.getPartitionKeys()) {
+            // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues()
+            ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
+            i++;
+        }
 
-    int i = 0;
-    for(FieldSchema schema : table.getPartitionKeys()) {
-      // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues()
-      ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
-      i++;
+        return ptnKeyValues;
     }
 
-    return ptnKeyValues;
-  }
-
-  private static PartInfo extractPartInfo(HCatSchema schema, StorageDescriptor sd,
-      Map<String,String> parameters, Configuration conf,
-      InputJobInfo inputJobInfo) throws IOException{
+    private static PartInfo extractPartInfo(HCatSchema schema, StorageDescriptor sd,
+                                            Map<String, String> parameters, Configuration conf,
+                                            InputJobInfo inputJobInfo) throws IOException {
 
-    StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd,parameters);
+        StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd, parameters);
 
-    Properties hcatProperties = new Properties();
-    HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, storerInfo);
+        Properties hcatProperties = new Properties();
+        HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, storerInfo);
 
-    // copy the properties from storageHandler to jobProperties
-    Map<String, String>jobProperties = HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
+        // copy the properties from storageHandler to jobProperties
+        Map<String, String> jobProperties = HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
 
-    for (String key : parameters.keySet()){
-        hcatProperties.put(key, parameters.get(key));
+        for (String key : parameters.keySet()) {
+            hcatProperties.put(key, parameters.get(key));
+        }
+        // FIXME
+        // Bloating partinfo with inputJobInfo is not good
+        return new PartInfo(schema, storageHandler, sd.getLocation(),
+            hcatProperties, jobProperties, inputJobInfo.getTableInfo());
     }
-    // FIXME
-    // Bloating partinfo with inputJobInfo is not good
-    return new PartInfo(schema, storageHandler, sd.getLocation(),
-        hcatProperties, jobProperties, inputJobInfo.getTableInfo());
-  }
 
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java Mon Sep 10 23:28:55 2012
@@ -24,114 +24,114 @@ import java.util.List;
 import java.util.Properties;
 
 /** The class used to serialize and store the information read from the metadata server */
-public class InputJobInfo implements Serializable{
+public class InputJobInfo implements Serializable {
 
-  /** The serialization version */
-  private static final long serialVersionUID = 1L;
+    /** The serialization version */
+    private static final long serialVersionUID = 1L;
 
-  /** The db and table names. */
-  private final String databaseName;
-  private final String tableName;
-
-  /** meta information of the table to be read from */
-  private HCatTableInfo tableInfo;
-
-  /** The partition filter */
-  private String filter;
-
-  /** The list of partitions matching the filter. */
-  private List<PartInfo> partitions;
-
-  /** implementation specific job properties */
-  private Properties properties;
-
-  /**
-   * Initializes a new InputJobInfo
-   * for reading data from a table.
-   * @param databaseName the db name
-   * @param tableName the table name
-   * @param filter the partition filter
-   */
-
-  public static InputJobInfo create(String databaseName,
-      String tableName,
-      String filter) {
-    return new InputJobInfo(databaseName, tableName, filter);
-  }
-
-  
-  private InputJobInfo(String databaseName,
-                       String tableName,
-                       String filter) {
-    this.databaseName = (databaseName == null) ? 
-                        MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
-    this.tableName = tableName;
-    this.filter = filter;
-    this.properties = new Properties();
-  }
-
-  /**
-   * Gets the value of databaseName
-   * @return the databaseName
-   */
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-  /**
-   * Gets the value of tableName
-   * @return the tableName
-   */
-  public String getTableName() {
-    return tableName;
-  }
-
-  /**
-   * Gets the table's meta information
-   * @return the HCatTableInfo
-   */
-  public HCatTableInfo getTableInfo() {
-    return tableInfo;
-  }
-
-  /**
-   * set the tablInfo instance
-   * this should be the same instance
-   * determined by this object's DatabaseName and TableName
-   * @param tableInfo
-   */
-  void setTableInfo(HCatTableInfo tableInfo) {
-    this.tableInfo = tableInfo;
-  }
-
-  /**
-   * Gets the value of partition filter
-   * @return the filter string
-   */
-  public String getFilter() {
-    return filter;
-  }
-
-  /**
-   * @return partition info
-   */
-  public List<PartInfo> getPartitions() {
-    return partitions;
-  }
-
-  /**
-   * @return partition info  list
-   */
-  void setPartitions(List<PartInfo> partitions) {
-    this.partitions = partitions;
-  }
-
-  /**
-   * Set/Get Property information to be passed down to *StorageHandler implementation
-   * put implementation specific storage handler configurations here
-   * @return the implementation specific job properties 
-   */
-  public Properties getProperties() {
-    return properties;
-  }
+    /** The db and table names. */
+    private final String databaseName;
+    private final String tableName;
+
+    /** meta information of the table to be read from */
+    private HCatTableInfo tableInfo;
+
+    /** The partition filter */
+    private String filter;
+
+    /** The list of partitions matching the filter. */
+    private List<PartInfo> partitions;
+
+    /** implementation specific job properties */
+    private Properties properties;
+
+    /**
+     * Initializes a new InputJobInfo
+     * for reading data from a table.
+     * @param databaseName the db name
+     * @param tableName the table name
+     * @param filter the partition filter
+     */
+
+    public static InputJobInfo create(String databaseName,
+                                      String tableName,
+                                      String filter) {
+        return new InputJobInfo(databaseName, tableName, filter);
+    }
+
+
+    private InputJobInfo(String databaseName,
+                         String tableName,
+                         String filter) {
+        this.databaseName = (databaseName == null) ?
+            MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
+        this.tableName = tableName;
+        this.filter = filter;
+        this.properties = new Properties();
+    }
+
+    /**
+     * Gets the value of databaseName
+     * @return the databaseName
+     */
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    /**
+     * Gets the value of tableName
+     * @return the tableName
+     */
+    public String getTableName() {
+        return tableName;
+    }
+
+    /**
+     * Gets the table's meta information
+     * @return the HCatTableInfo
+     */
+    public HCatTableInfo getTableInfo() {
+        return tableInfo;
+    }
+
+    /**
+     * set the tablInfo instance
+     * this should be the same instance
+     * determined by this object's DatabaseName and TableName
+     * @param tableInfo
+     */
+    void setTableInfo(HCatTableInfo tableInfo) {
+        this.tableInfo = tableInfo;
+    }
+
+    /**
+     * Gets the value of partition filter
+     * @return the filter string
+     */
+    public String getFilter() {
+        return filter;
+    }
+
+    /**
+     * @return partition info
+     */
+    public List<PartInfo> getPartitions() {
+        return partitions;
+    }
+
+    /**
+     * @return partition info  list
+     */
+    void setPartitions(List<PartInfo> partitions) {
+        this.partitions = partitions;
+    }
+
+    /**
+     * Set/Get Property information to be passed down to *StorageHandler implementation
+     * put implementation specific storage handler configurations here
+     * @return the implementation specific job properties
+     */
+    public Properties getProperties() {
+        return properties;
+    }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java Mon Sep 10 23:28:55 2012
@@ -57,132 +57,132 @@ class InternalUtil {
 
     static StorerInfo extractStorerInfo(StorageDescriptor sd, Map<String, String> properties) throws IOException {
         Properties hcatProperties = new Properties();
-        for (String key : properties.keySet()){
+        for (String key : properties.keySet()) {
             hcatProperties.put(key, properties.get(key));
         }
 
         // also populate with StorageDescriptor->SerDe.Parameters
-        for (Map.Entry<String, String>param :
+        for (Map.Entry<String, String> param :
             sd.getSerdeInfo().getParameters().entrySet()) {
-          hcatProperties.put(param.getKey(), param.getValue());
+            hcatProperties.put(param.getKey(), param.getValue());
         }
 
 
         return new StorerInfo(
-                sd.getInputFormat(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(),
-                properties.get(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE),
-                hcatProperties);
-    }
-
-  static StructObjectInspector createStructObjectInspector(HCatSchema outputSchema) throws IOException {
-
-    if(outputSchema == null ) {
-      throw new IOException("Invalid output schema specified");
-    }
-
-    List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
-    List<String> fieldNames = new ArrayList<String>();
-
-    for(HCatFieldSchema hcatFieldSchema : outputSchema.getFields()) {
-      TypeInfo type = TypeInfoUtils.getTypeInfoFromTypeString(hcatFieldSchema.getTypeString());
-
-      fieldNames.add(hcatFieldSchema.getName());
-      fieldInspectors.add(getObjectInspector(type));
-    }
-
-    StructObjectInspector structInspector = ObjectInspectorFactory.
-        getStandardStructObjectInspector(fieldNames, fieldInspectors);
-    return structInspector;
-  }
-
-  private static ObjectInspector getObjectInspector(TypeInfo type) throws IOException {
-
-    switch(type.getCategory()) {
-
-    case PRIMITIVE :
-      PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type;
-      return PrimitiveObjectInspectorFactory.
-        getPrimitiveJavaObjectInspector(primitiveType.getPrimitiveCategory());
-
-    case MAP :
-      MapTypeInfo mapType = (MapTypeInfo) type;
-      MapObjectInspector mapInspector = ObjectInspectorFactory.getStandardMapObjectInspector(
-          getObjectInspector(mapType.getMapKeyTypeInfo()), getObjectInspector(mapType.getMapValueTypeInfo()));
-      return mapInspector;
-
-    case LIST :
-      ListTypeInfo listType = (ListTypeInfo) type;
-      ListObjectInspector listInspector = ObjectInspectorFactory.getStandardListObjectInspector(
-          getObjectInspector(listType.getListElementTypeInfo()));
-      return listInspector;
-
-    case STRUCT :
-      StructTypeInfo structType = (StructTypeInfo) type;
-      List<TypeInfo> fieldTypes = structType.getAllStructFieldTypeInfos();
-
-      List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
-      for(TypeInfo fieldType : fieldTypes) {
-        fieldInspectors.add(getObjectInspector(fieldType));
-      }
-
-      StructObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
-          structType.getAllStructFieldNames(), fieldInspectors);
-      return structInspector;
-
-    default :
-      throw new IOException("Unknown field schema type");
-    }
-  }
-
-  //TODO this has to find a better home, it's also hardcoded as default in hive would be nice
-  // if the default was decided by the serde
-  static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo)
-      throws SerDeException {
-    serDe.initialize(conf, getSerdeProperties(jobInfo.getTableInfo(), jobInfo.getOutputSchema()));
-  }
-
-  static void initializeDeserializer(Deserializer deserializer, Configuration conf,
-      HCatTableInfo info, HCatSchema schema) throws SerDeException {
-    Properties props = getSerdeProperties(info, schema);
-    LOG.info("Initializing " + deserializer.getClass().getName() + " with properties " + props);
-    deserializer.initialize(conf, props);
-  }
-
-  private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s)
-      throws SerDeException {
-    Properties props = new Properties();
-    List<FieldSchema> fields = HCatUtil.getFieldSchemaList(s.getFields());
-    props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS,
-        MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
-    props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES,
-        MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
-
-    // setting these props to match LazySimpleSerde
-    props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_NULL_FORMAT, "\\N");
-    props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
-
-    //add props from params set in table schema
-    props.putAll(info.getStorerInfo().getProperties());
-
-    return props;
-  }
-
-static Reporter createReporter(TaskAttemptContext context) {
-      return new ProgressReporter(context);
-  }
-
-  /**
-   * Casts an InputSplit into a HCatSplit, providing a useful error message if the cast fails.
-   * @param split the InputSplit
-   * @return the HCatSplit
-   * @throws IOException
-   */
-  public static HCatSplit castToHCatSplit(InputSplit split) throws IOException {
-    if (split instanceof HCatSplit) {
-      return (HCatSplit) split;
-    } else {
-      throw new IOException("Split must be " + HCatSplit.class.getName()
-          + " but found " + split.getClass().getName());
+            sd.getInputFormat(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(),
+            properties.get(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE),
+            hcatProperties);
+    }
+
+    static StructObjectInspector createStructObjectInspector(HCatSchema outputSchema) throws IOException {
+
+        if (outputSchema == null) {
+            throw new IOException("Invalid output schema specified");
+        }
+
+        List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
+        List<String> fieldNames = new ArrayList<String>();
+
+        for (HCatFieldSchema hcatFieldSchema : outputSchema.getFields()) {
+            TypeInfo type = TypeInfoUtils.getTypeInfoFromTypeString(hcatFieldSchema.getTypeString());
+
+            fieldNames.add(hcatFieldSchema.getName());
+            fieldInspectors.add(getObjectInspector(type));
+        }
+
+        StructObjectInspector structInspector = ObjectInspectorFactory.
+            getStandardStructObjectInspector(fieldNames, fieldInspectors);
+        return structInspector;
+    }
+
+    private static ObjectInspector getObjectInspector(TypeInfo type) throws IOException {
+
+        switch (type.getCategory()) {
+
+        case PRIMITIVE:
+            PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type;
+            return PrimitiveObjectInspectorFactory.
+                getPrimitiveJavaObjectInspector(primitiveType.getPrimitiveCategory());
+
+        case MAP:
+            MapTypeInfo mapType = (MapTypeInfo) type;
+            MapObjectInspector mapInspector = ObjectInspectorFactory.getStandardMapObjectInspector(
+                getObjectInspector(mapType.getMapKeyTypeInfo()), getObjectInspector(mapType.getMapValueTypeInfo()));
+            return mapInspector;
+
+        case LIST:
+            ListTypeInfo listType = (ListTypeInfo) type;
+            ListObjectInspector listInspector = ObjectInspectorFactory.getStandardListObjectInspector(
+                getObjectInspector(listType.getListElementTypeInfo()));
+            return listInspector;
+
+        case STRUCT:
+            StructTypeInfo structType = (StructTypeInfo) type;
+            List<TypeInfo> fieldTypes = structType.getAllStructFieldTypeInfos();
+
+            List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
+            for (TypeInfo fieldType : fieldTypes) {
+                fieldInspectors.add(getObjectInspector(fieldType));
+            }
+
+            StructObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+                structType.getAllStructFieldNames(), fieldInspectors);
+            return structInspector;
+
+        default:
+            throw new IOException("Unknown field schema type");
+        }
+    }
+
+    //TODO this has to find a better home, it's also hardcoded as default in hive would be nice
+    // if the default was decided by the serde
+    static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo)
+        throws SerDeException {
+        serDe.initialize(conf, getSerdeProperties(jobInfo.getTableInfo(), jobInfo.getOutputSchema()));
+    }
+
+    static void initializeDeserializer(Deserializer deserializer, Configuration conf,
+                                       HCatTableInfo info, HCatSchema schema) throws SerDeException {
+        Properties props = getSerdeProperties(info, schema);
+        LOG.info("Initializing " + deserializer.getClass().getName() + " with properties " + props);
+        deserializer.initialize(conf, props);
+    }
+
+    private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s)
+        throws SerDeException {
+        Properties props = new Properties();
+        List<FieldSchema> fields = HCatUtil.getFieldSchemaList(s.getFields());
+        props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS,
+            MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
+        props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES,
+            MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
+
+        // setting these props to match LazySimpleSerde
+        props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_NULL_FORMAT, "\\N");
+        props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+
+        //add props from params set in table schema
+        props.putAll(info.getStorerInfo().getProperties());
+
+        return props;
+    }
+
+    static Reporter createReporter(TaskAttemptContext context) {
+        return new ProgressReporter(context);
+    }
+
+    /**
+     * Casts an InputSplit into a HCatSplit, providing a useful error message if the cast fails.
+     * @param split the InputSplit
+     * @return the HCatSplit
+     * @throws IOException
+     */
+    public static HCatSplit castToHCatSplit(InputSplit split) throws IOException {
+        if (split instanceof HCatSplit) {
+            return (HCatSplit) split;
+        } else {
+            throw new IOException("Split must be " + HCatSplit.class.getName()
+                + " but found " + split.getClass().getName());
+        }
     }
-  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java Mon Sep 10 23:28:55 2012
@@ -207,7 +207,7 @@ public class MultiOutputFormat extends O
      * @throws InterruptedException
      */
     public static <K, V> void write(String alias, K key, V value, TaskInputOutputContext context)
-            throws IOException, InterruptedException {
+        throws IOException, InterruptedException {
         KeyValue<K, V> keyval = new KeyValue<K, V>(key, value);
         context.write(new Text(alias), keyval);
     }
@@ -227,14 +227,14 @@ public class MultiOutputFormat extends O
 
     @Override
     public RecordWriter<Writable, Writable> getRecordWriter(TaskAttemptContext context)
-            throws IOException,
-            InterruptedException {
+        throws IOException,
+        InterruptedException {
         return new MultiRecordWriter(context);
     }
 
     @Override
     public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
-            InterruptedException {
+        InterruptedException {
         return new MultiOutputCommitter(context);
     }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java Mon Sep 10 23:28:55 2012
@@ -38,7 +38,7 @@ abstract class OutputFormatContainer ext
     /**
      * @param of OutputFormat this instance will contain
      */
-    public OutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>,? super Writable> of) {
+    public OutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>, ? super Writable> of) {
         this.of = of;
     }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java Mon Sep 10 23:28:55 2012
@@ -31,238 +31,239 @@ import org.apache.hcatalog.data.schema.H
 /** The class used to serialize and store the output related information  */
 public class OutputJobInfo implements Serializable {
 
-  /** The db and table names. */
-  private final String databaseName;
-  private final String tableName;
-
-  /** The serialization version. */
-  private static final long serialVersionUID = 1L;
-
-  /** The table info provided by user. */
-  private HCatTableInfo tableInfo;
-
-  /** The output schema. This is given to us by user.  This wont contain any
-   * partition columns ,even if user has specified them.
-   * */
-  private HCatSchema outputSchema;
-
-  /** The location of the partition being written */
-  private String location;
-
-  /** The partition values to publish to, if used for output*/
-  private Map<String, String> partitionValues;
-
-  private List<Integer> posOfPartCols;
-  private List<Integer> posOfDynPartCols;
-
-  private Properties properties;
-
-  private int maxDynamicPartitions;
-
-  /** List of keys for which values were not specified at write setup time, to be infered at write time */
-  private List<String> dynamicPartitioningKeys;
-
-  private boolean harRequested;
-
-  /**
-   * Initializes a new OutputJobInfo instance
-   * for writing data from a table.
-   * @param databaseName the db name
-   * @param tableName the table name
-   * @param partitionValues The partition values to publish to, can be null or empty Map to
-   * work with hadoop security, the kerberos principal name of the server - else null
-   * The principal name should be of the form:
-   * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
-   * The special string _HOST will be replaced automatically with the correct host name
-   * indicate write to a unpartitioned table. For partitioned tables, this map should
-   * contain keys for all partition columns with corresponding values.
-   */
-  public static OutputJobInfo create(String databaseName,
-                                     String tableName,
-                                     Map<String, String> partitionValues) {
-    return new OutputJobInfo(databaseName,
-        tableName,
-        partitionValues);
-  }
-
-  private OutputJobInfo(String databaseName,
-                        String tableName,
-                        Map<String, String> partitionValues) {
-    this.databaseName =  (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
-    this.tableName = tableName;
-    this.partitionValues = partitionValues;
-    this.properties = new Properties();
-  }
-
-  /**
-   * @return the posOfPartCols
-   */
-  protected List<Integer> getPosOfPartCols() {
-    return posOfPartCols;
-  }
-
-  /**
-   * @return the posOfDynPartCols
-   */
-  protected List<Integer> getPosOfDynPartCols() {
-    return posOfDynPartCols;
-  }
-
-  /**
-   * @param posOfPartCols the posOfPartCols to set
-   */
-  protected void setPosOfPartCols(List<Integer> posOfPartCols) {
-    // sorting the list in the descending order so that deletes happen back-to-front
-    Collections.sort(posOfPartCols, new Comparator<Integer> () {
-      @Override
-      public int compare(Integer earlier, Integer later) {
-        return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1);
-      }
-    });
-    this.posOfPartCols = posOfPartCols;
-  }
+    /** The db and table names. */
+    private final String databaseName;
+    private final String tableName;
+
+    /** The serialization version. */
+    private static final long serialVersionUID = 1L;
+
+    /** The table info provided by user. */
+    private HCatTableInfo tableInfo;
+
+    /** The output schema. This is given to us by user.  This wont contain any
+     * partition columns ,even if user has specified them.
+     * */
+    private HCatSchema outputSchema;
+
+    /** The location of the partition being written */
+    private String location;
+
+    /** The partition values to publish to, if used for output*/
+    private Map<String, String> partitionValues;
+
+    private List<Integer> posOfPartCols;
+    private List<Integer> posOfDynPartCols;
+
+    private Properties properties;
+
+    private int maxDynamicPartitions;
+
+    /** List of keys for which values were not specified at write setup time, to be infered at write time */
+    private List<String> dynamicPartitioningKeys;
+
+    private boolean harRequested;
+
+    /**
+     * Initializes a new OutputJobInfo instance
+     * for writing data from a table.
+     * @param databaseName the db name
+     * @param tableName the table name
+     * @param partitionValues The partition values to publish to, can be null or empty Map to
+     * work with hadoop security, the kerberos principal name of the server - else null
+     * The principal name should be of the form:
+     * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
+     * The special string _HOST will be replaced automatically with the correct host name
+     * indicate write to a unpartitioned table. For partitioned tables, this map should
+     * contain keys for all partition columns with corresponding values.
+     */
+    public static OutputJobInfo create(String databaseName,
+                                       String tableName,
+                                       Map<String, String> partitionValues) {
+        return new OutputJobInfo(databaseName,
+            tableName,
+            partitionValues);
+    }
 
-  /**
+    private OutputJobInfo(String databaseName,
+                          String tableName,
+                          Map<String, String> partitionValues) {
+        this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
+        this.tableName = tableName;
+        this.partitionValues = partitionValues;
+        this.properties = new Properties();
+    }
+
+    /**
+     * @return the posOfPartCols
+     */
+    protected List<Integer> getPosOfPartCols() {
+        return posOfPartCols;
+    }
+
+    /**
+     * @return the posOfDynPartCols
+     */
+    protected List<Integer> getPosOfDynPartCols() {
+        return posOfDynPartCols;
+    }
+
+    /**
+     * @param posOfPartCols the posOfPartCols to set
+     */
+    protected void setPosOfPartCols(List<Integer> posOfPartCols) {
+        // sorting the list in the descending order so that deletes happen back-to-front
+        Collections.sort(posOfPartCols, new Comparator<Integer>() {
+            @Override
+            public int compare(Integer earlier, Integer later) {
+                return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1);
+            }
+        });
+        this.posOfPartCols = posOfPartCols;
+    }
+
+    /**
      * @param posOfDynPartCols the posOfDynPartCols to set
      */
     protected void setPosOfDynPartCols(List<Integer> posOfDynPartCols) {
-      // Important - no sorting here! We retain order, it's used to match with values at runtime
-      this.posOfDynPartCols = posOfDynPartCols;
+        // Important - no sorting here! We retain order, it's used to match with values at runtime
+        this.posOfDynPartCols = posOfDynPartCols;
     }
 
-  /**
-   * @return the tableInfo
-   */
-  public HCatTableInfo getTableInfo() {
-    return tableInfo;
-  }
-
-  /**
-   * @return the outputSchema
-   */
-  public HCatSchema getOutputSchema() {
-    return outputSchema;
-  }
-
-  /**
-   * @param schema the outputSchema to set
-   */
-  public void setOutputSchema(HCatSchema schema) {
-    this.outputSchema = schema;
-  }
-
-  /**
-   * @return the location
-   */
-  public String getLocation() {
-    return location;
-  }
-
-  /**
-   * @param location location to write to
-   */
-  public void setLocation(String location) {
-    this.location = location;
-  }
-  /**
-   * Sets the value of partitionValues
-   * @param partitionValues the partition values to set
-   */
-  void setPartitionValues(Map<String, String>  partitionValues) {
-    this.partitionValues = partitionValues;
-  }
-
-  /**
-   * Gets the value of partitionValues
-   * @return the partitionValues
-   */
-  public Map<String, String> getPartitionValues() {
-    return partitionValues;
-  }
-
-  /**
-   * set the tablInfo instance
-   * this should be the same instance
-   * determined by this object's DatabaseName and TableName
-   * @param tableInfo
-   */
-  void setTableInfo(HCatTableInfo tableInfo) {
-    this.tableInfo = tableInfo;
-  }
-
-  /**
-   * @return database name of table to write to
-   */
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-  /**
-   * @return name of table to write to
-   */
-  public String getTableName() {
-    return tableName;
-  }
-
-  /**
-   * Set/Get Property information to be passed down to *StorageHandler implementation
-   * put implementation specific storage handler configurations here
-   * @return the implementation specific job properties 
-   */
-  public Properties getProperties() {
-    return properties;
-  }
-
-  /**
-   * Set maximum number of allowable dynamic partitions
-   * @param maxDynamicPartitions
-   */
-  public void setMaximumDynamicPartitions(int maxDynamicPartitions){
-    this.maxDynamicPartitions = maxDynamicPartitions;
-  }
-
-  /**
-   * Returns maximum number of allowable dynamic partitions
-   * @return maximum number of allowable dynamic partitions
-   */
-  public int getMaxDynamicPartitions() {
-    return this.maxDynamicPartitions;
-  }
-
-  /**
-   * Sets whether or not hadoop archiving has been requested for this job
-   * @param harRequested
-   */
-  public void setHarRequested(boolean harRequested){
-    this.harRequested = harRequested;
-  }
-
-  /**
-   * Returns whether or not hadoop archiving has been requested for this job
-   * @return whether or not hadoop archiving has been requested for this job
-   */
-  public boolean getHarRequested() {
-    return this.harRequested;
-  }
-
-  /**
-   * Returns whether or not Dynamic Partitioning is used
-   * @return whether or not dynamic partitioning is currently enabled and used
-   */
-  public boolean isDynamicPartitioningUsed() {
-    return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty()));
-  }
-
-  /**
-   * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys
-   * @param dynamicPartitioningKeys
-   */
-  public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) {
-    this.dynamicPartitioningKeys = dynamicPartitioningKeys;
-  }
-
-  public List<String> getDynamicPartitioningKeys(){
-    return this.dynamicPartitioningKeys;
-  }
+    /**
+     * @return the tableInfo
+     */
+    public HCatTableInfo getTableInfo() {
+        return tableInfo;
+    }
+
+    /**
+     * @return the outputSchema
+     */
+    public HCatSchema getOutputSchema() {
+        return outputSchema;
+    }
+
+    /**
+     * @param schema the outputSchema to set
+     */
+    public void setOutputSchema(HCatSchema schema) {
+        this.outputSchema = schema;
+    }
+
+    /**
+     * @return the location
+     */
+    public String getLocation() {
+        return location;
+    }
+
+    /**
+     * @param location location to write to
+     */
+    public void setLocation(String location) {
+        this.location = location;
+    }
+
+    /**
+     * Sets the value of partitionValues
+     * @param partitionValues the partition values to set
+     */
+    void setPartitionValues(Map<String, String> partitionValues) {
+        this.partitionValues = partitionValues;
+    }
+
+    /**
+     * Gets the value of partitionValues
+     * @return the partitionValues
+     */
+    public Map<String, String> getPartitionValues() {
+        return partitionValues;
+    }
+
+    /**
+     * set the tablInfo instance
+     * this should be the same instance
+     * determined by this object's DatabaseName and TableName
+     * @param tableInfo
+     */
+    void setTableInfo(HCatTableInfo tableInfo) {
+        this.tableInfo = tableInfo;
+    }
+
+    /**
+     * @return database name of table to write to
+     */
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    /**
+     * @return name of table to write to
+     */
+    public String getTableName() {
+        return tableName;
+    }
+
+    /**
+     * Set/Get Property information to be passed down to *StorageHandler implementation
+     * put implementation specific storage handler configurations here
+     * @return the implementation specific job properties
+     */
+    public Properties getProperties() {
+        return properties;
+    }
+
+    /**
+     * Set maximum number of allowable dynamic partitions
+     * @param maxDynamicPartitions
+     */
+    public void setMaximumDynamicPartitions(int maxDynamicPartitions) {
+        this.maxDynamicPartitions = maxDynamicPartitions;
+    }
+
+    /**
+     * Returns maximum number of allowable dynamic partitions
+     * @return maximum number of allowable dynamic partitions
+     */
+    public int getMaxDynamicPartitions() {
+        return this.maxDynamicPartitions;
+    }
+
+    /**
+     * Sets whether or not hadoop archiving has been requested for this job
+     * @param harRequested
+     */
+    public void setHarRequested(boolean harRequested) {
+        this.harRequested = harRequested;
+    }
+
+    /**
+     * Returns whether or not hadoop archiving has been requested for this job
+     * @return whether or not hadoop archiving has been requested for this job
+     */
+    public boolean getHarRequested() {
+        return this.harRequested;
+    }
+
+    /**
+     * Returns whether or not Dynamic Partitioning is used
+     * @return whether or not dynamic partitioning is currently enabled and used
+     */
+    public boolean isDynamicPartitioningUsed() {
+        return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty()));
+    }
+
+    /**
+     * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys
+     * @param dynamicPartitioningKeys
+     */
+    public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) {
+        this.dynamicPartitioningKeys = dynamicPartitioningKeys;
+    }
+
+    public List<String> getDynamicPartitioningKeys() {
+        return this.dynamicPartitioningKeys;
+    }
 
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java Mon Sep 10 23:28:55 2012
@@ -26,138 +26,138 @@ import org.apache.hcatalog.data.schema.H
 /** The Class used to serialize the partition information read from the metadata server that maps to a partition. */
 public class PartInfo implements Serializable {
 
-  /** The serialization version */
-  private static final long serialVersionUID = 1L;
+    /** The serialization version */
+    private static final long serialVersionUID = 1L;
 
-  /** The partition schema. */
-  private final HCatSchema partitionSchema;
+    /** The partition schema. */
+    private final HCatSchema partitionSchema;
 
-  /** The information about which input storage handler to use */
-  private final String storageHandlerClassName;
-  private final String inputFormatClassName;
-  private final String outputFormatClassName;
-  private final String serdeClassName;
-
-  /** HCat-specific properties set at the partition */
-  private final Properties hcatProperties;
-
-  /** The data location. */
-  private final String location;
-
-  /** The map of partition key names and their values. */
-  private Map<String,String> partitionValues;
-
-  /** Job properties associated with this parition */
-  Map<String,String> jobProperties;
-
-  /** the table info associated with this partition */
-  HCatTableInfo tableInfo;
-
-  /**
-   * Instantiates a new hcat partition info.
-   * @param partitionSchema the partition schema
-   * @param storageHandler the storage handler
-   * @param location the location
-   * @param hcatProperties hcat-specific properties at the partition
-   * @param jobProperties the job properties
-   * @param tableInfo the table information
-   */
-  public PartInfo(HCatSchema partitionSchema, HCatStorageHandler storageHandler,
-                  String location, Properties hcatProperties, 
-                  Map<String,String> jobProperties, HCatTableInfo tableInfo){
-    this.partitionSchema = partitionSchema;
-    this.location = location;
-    this.hcatProperties = hcatProperties;
-    this.jobProperties = jobProperties;
-    this.tableInfo = tableInfo;
-
-    this.storageHandlerClassName = storageHandler.getClass().getName();
-    this.inputFormatClassName = storageHandler.getInputFormatClass().getName();
-    this.serdeClassName = storageHandler.getSerDeClass().getName();
-    this.outputFormatClassName = storageHandler.getOutputFormatClass().getName();
-}
-
-  /**
-   * Gets the value of partitionSchema.
-   * @return the partitionSchema
-   */
-  public HCatSchema getPartitionSchema() {
-    return partitionSchema;
-  }
-
-  /**
-   * @return the storage handler class name
-   */
-  public String getStorageHandlerClassName() {
-    return storageHandlerClassName;
-  }
-
-  /**
-   * @return the inputFormatClassName
-   */
-  public String getInputFormatClassName() {
-    return inputFormatClassName;
-  }
-
-  /**
-   * @return the outputFormatClassName
-   */
-  public String getOutputFormatClassName() {
-    return outputFormatClassName;
-  }
-
-  /**
-   * @return the serdeClassName
-   */
-  public String getSerdeClassName() {
-    return serdeClassName;
-  }
-
-  /**
-   * Gets the input storage handler properties.
-   * @return HCat-specific properties set at the partition 
-   */
-  public Properties getInputStorageHandlerProperties() {
-    return hcatProperties;
-  }
-
-  /**
-   * Gets the value of location.
-   * @return the location
-   */
-  public String getLocation() {
-    return location;
-  }
-
-  /**
-   * Sets the partition values.
-   * @param partitionValues the new partition values
-   */
-  public void setPartitionValues(Map<String,String> partitionValues) {
-    this.partitionValues = partitionValues;
-  }
-
-  /**
-   * Gets the partition values.
-   * @return the partition values
-   */
-  public Map<String,String> getPartitionValues() {
-    return partitionValues;
-  }
-
-  /**
-   * Gets the job properties.
-   * @return a map of the job properties
-   */
-  public Map<String,String> getJobProperties() {
-    return jobProperties;
-  }
-
-  /**
-   * Gets the HCatalog table information.
-   * @return the table information
-   */
-  public HCatTableInfo getTableInfo() {
-    return tableInfo;
-  }
+    /** The information about which input storage handler to use */
+    private final String storageHandlerClassName;
+    private final String inputFormatClassName;
+    private final String outputFormatClassName;
+    private final String serdeClassName;
+
+    /** HCat-specific properties set at the partition */
+    private final Properties hcatProperties;
+
+    /** The data location. */
+    private final String location;
+
+    /** The map of partition key names and their values. */
+    private Map<String, String> partitionValues;
+
+    /** Job properties associated with this parition */
+    Map<String, String> jobProperties;
+
+    /** the table info associated with this partition */
+    HCatTableInfo tableInfo;
+
+    /**
+     * Instantiates a new hcat partition info.
+     * @param partitionSchema the partition schema
+     * @param storageHandler the storage handler
+     * @param location the location
+     * @param hcatProperties hcat-specific properties at the partition
+     * @param jobProperties the job properties
+     * @param tableInfo the table information
+     */
+    public PartInfo(HCatSchema partitionSchema, HCatStorageHandler storageHandler,
+                    String location, Properties hcatProperties,
+                    Map<String, String> jobProperties, HCatTableInfo tableInfo) {
+        this.partitionSchema = partitionSchema;
+        this.location = location;
+        this.hcatProperties = hcatProperties;
+        this.jobProperties = jobProperties;
+        this.tableInfo = tableInfo;
+
+        this.storageHandlerClassName = storageHandler.getClass().getName();
+        this.inputFormatClassName = storageHandler.getInputFormatClass().getName();
+        this.serdeClassName = storageHandler.getSerDeClass().getName();
+        this.outputFormatClassName = storageHandler.getOutputFormatClass().getName();
+    }
+
+    /**
+     * Gets the value of partitionSchema.
+     * @return the partitionSchema
+     */
+    public HCatSchema getPartitionSchema() {
+        return partitionSchema;
+    }
+
+    /**
+     * @return the storage handler class name
+     */
+    public String getStorageHandlerClassName() {
+        return storageHandlerClassName;
+    }
+
+    /**
+     * @return the inputFormatClassName
+     */
+    public String getInputFormatClassName() {
+        return inputFormatClassName;
+    }
+
+    /**
+     * @return the outputFormatClassName
+     */
+    public String getOutputFormatClassName() {
+        return outputFormatClassName;
+    }
+
+    /**
+     * @return the serdeClassName
+     */
+    public String getSerdeClassName() {
+        return serdeClassName;
+    }
+
+    /**
+     * Gets the input storage handler properties.
+     * @return HCat-specific properties set at the partition
+     */
+    public Properties getInputStorageHandlerProperties() {
+        return hcatProperties;
+    }
+
+    /**
+     * Gets the value of location.
+     * @return the location
+     */
+    public String getLocation() {
+        return location;
+    }
+
+    /**
+     * Sets the partition values.
+     * @param partitionValues the new partition values
+     */
+    public void setPartitionValues(Map<String, String> partitionValues) {
+        this.partitionValues = partitionValues;
+    }
+
+    /**
+     * Gets the partition values.
+     * @return the partition values
+     */
+    public Map<String, String> getPartitionValues() {
+        return partitionValues;
+    }
+
+    /**
+     * Gets the job properties.
+     * @return a map of the job properties
+     */
+    public Map<String, String> getJobProperties() {
+        return jobProperties;
+    }
+
+    /**
+     * Gets the HCatalog table information.
+     * @return the table information
+     */
+    public HCatTableInfo getTableInfo() {
+        return tableInfo;
+    }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java Mon Sep 10 23:28:55 2012
@@ -27,65 +27,65 @@ import org.apache.hadoop.mapreduce.TaskI
 
 class ProgressReporter extends StatusReporter implements Reporter {
 
-  private TaskInputOutputContext context = null;
-  private TaskAttemptContext taskAttemptContext = null;
+    private TaskInputOutputContext context = null;
+    private TaskAttemptContext taskAttemptContext = null;
 
-  public ProgressReporter(TaskAttemptContext context) {
-    if (context instanceof TaskInputOutputContext) {
-      this.context = (TaskInputOutputContext) context;
-    } else {
-      taskAttemptContext = context;
-    }
-  }
-
-  @Override
-  public void setStatus(String status) {
-    if (context != null) {
-      context.setStatus(status);
-    }
-  }
-
-  @Override
-  public Counters.Counter getCounter(Enum<?> name) {
-    return (context != null) ? (Counters.Counter) context.getCounter(name) : null;
-  }
-
-  @Override
-  public Counters.Counter getCounter(String group, String name) {
-    return (context != null) ? (Counters.Counter) context.getCounter(group, name) : null;
-  }
-
-  @Override
-  public void incrCounter(Enum<?> key, long amount) {
-    if (context != null) {
-      context.getCounter(key).increment(amount);
-    }
-  }
-
-  @Override
-  public void incrCounter(String group, String counter, long amount) {
-    if (context != null) {
-      context.getCounter(group, counter).increment(amount);
-    }
-  }
-
-  @Override
-  public InputSplit getInputSplit() throws UnsupportedOperationException {
-    return null;
-  }
-
-  public float getProgress() {
-      /* Required to build against 0.23 Reporter and StatusReporter. */
-      /* TODO: determine the progress. */
-      return 0.0f;
-  }
-
-  @Override
-  public void progress() {
-    if (context != null) {
-      context.progress();
-    } else {
-      taskAttemptContext.progress();
+    public ProgressReporter(TaskAttemptContext context) {
+        if (context instanceof TaskInputOutputContext) {
+            this.context = (TaskInputOutputContext) context;
+        } else {
+            taskAttemptContext = context;
+        }
+    }
+
+    @Override
+    public void setStatus(String status) {
+        if (context != null) {
+            context.setStatus(status);
+        }
+    }
+
+    @Override
+    public Counters.Counter getCounter(Enum<?> name) {
+        return (context != null) ? (Counters.Counter) context.getCounter(name) : null;
+    }
+
+    @Override
+    public Counters.Counter getCounter(String group, String name) {
+        return (context != null) ? (Counters.Counter) context.getCounter(group, name) : null;
+    }
+
+    @Override
+    public void incrCounter(Enum<?> key, long amount) {
+        if (context != null) {
+            context.getCounter(key).increment(amount);
+        }
+    }
+
+    @Override
+    public void incrCounter(String group, String counter, long amount) {
+        if (context != null) {
+            context.getCounter(group, counter).increment(amount);
+        }
+    }
+
+    @Override
+    public InputSplit getInputSplit() throws UnsupportedOperationException {
+        return null;
+    }
+
+    public float getProgress() {
+        /* Required to build against 0.23 Reporter and StatusReporter. */
+        /* TODO: determine the progress. */
+        return 0.0f;
+    }
+
+    @Override
+    public void progress() {
+        if (context != null) {
+            context.progress();
+        } else {
+            taskAttemptContext.progress();
+        }
     }
-  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java Mon Sep 10 23:28:55 2012
@@ -44,134 +44,134 @@ import org.slf4j.LoggerFactory;
 
 final class Security {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HCatOutputFormat.class);
-  
-  // making sure this is not initialized unless needed
-  private static final class LazyHolder {
-    public static final Security INSTANCE = new Security();
-  }
-
-  public static Security getInstance() {
-    return LazyHolder.INSTANCE;
-  }
-
-  boolean isSecurityEnabled() {
-      try {
-          Method m = UserGroupInformation.class.getMethod("isSecurityEnabled");
-          return (Boolean)m.invoke(null, (Object[])null);
-      } catch (NoSuchMethodException e) {
-          LOG.info("Security is not supported by this version of hadoop.", e);
-      } catch (InvocationTargetException e) {
-          String msg = "Failed to call isSecurityEnabled()";
-          LOG.info(msg, e);
-          throw new IllegalStateException(msg,e);
-      } catch (IllegalAccessException e) {
-          String msg = "Failed to call isSecurityEnabled()";
-          LOG.info(msg, e);
-          throw new IllegalStateException(msg,e);
-      }
-      return false;
-  }
-
-  // a signature string to associate with a HCatTableInfo - essentially
-  // a concatenation of dbname, tablename and partition keyvalues.
-  String getTokenSignature(OutputJobInfo outputJobInfo) {
-    StringBuilder result = new StringBuilder("");
-    String dbName = outputJobInfo.getDatabaseName();
-    if(dbName != null) {
-      result.append(dbName);
-    }
-    String tableName = outputJobInfo.getTableName();
-    if(tableName != null) {
-      result.append("." + tableName);
-    }
-    Map<String, String> partValues = outputJobInfo.getPartitionValues();
-    if(partValues != null) {
-      for(Entry<String, String> entry: partValues.entrySet()) {
-        result.append("/");
-        result.append(entry.getKey());
-        result.append("=");
-        result.append(entry.getValue());
-      }
-
-    }
-    return result.toString();
-  }
-
-  void handleSecurity(
-      Job job, 
-      OutputJobInfo outputJobInfo,
-      HiveMetaStoreClient client, 
-      Configuration conf,
-      boolean harRequested)
-      throws IOException, MetaException, TException, Exception {
-    if(UserGroupInformation.isSecurityEnabled()){
-      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-      // check if oozie has set up a hcat deleg. token - if so use it
-      TokenSelector<? extends TokenIdentifier> hiveTokenSelector = new DelegationTokenSelector();
-      //Oozie does not change the service field of the token
-      //hence by default token generation will have a value of "new Text("")"
-      //HiveClient will look for a use TokenSelector.selectToken() with service
-      //set to empty "Text" if hive.metastore.token.signature property is set to null
-      Token<? extends TokenIdentifier> hiveToken = hiveTokenSelector.selectToken(
-        new Text(), ugi.getTokens());
-      if(hiveToken == null) {
-        // we did not get token set up by oozie, let's get them ourselves here.
-        // we essentially get a token per unique Output HCatTableInfo - this is
-        // done because through Pig, setOutput() method is called multiple times
-        // We want to only get the token once per unique output HCatTableInfo -
-        // we cannot just get one token since in multi-query case (> 1 store in 1 job)
-        // or the case when a single pig script results in > 1 jobs, the single
-        // token will get cancelled by the output committer and the subsequent
-        // stores will fail - by tying the token with the concatenation of
-        // dbname, tablename and partition keyvalues of the output
-        // TableInfo, we can have as many tokens as there are stores and the TokenSelector
-        // will correctly pick the right tokens which the committer will use and
-        // cancel.
-        String tokenSignature = getTokenSignature(outputJobInfo);
-        // get delegation tokens from hcat server and store them into the "job"
-        // These will be used in to publish partitions to
-        // hcat normally in OutputCommitter.commitJob()
-        // when the JobTracker in Hadoop MapReduce starts supporting renewal of
-        // arbitrary tokens, the renewer should be the principal of the JobTracker
-        hiveToken = HCatUtil.extractThriftToken(client.getDelegationToken(ugi.getUserName()), tokenSignature);
-
-        if (harRequested){
-          TokenSelector<? extends TokenIdentifier> jtTokenSelector =
-            new org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSelector();
-          Token jtToken = jtTokenSelector.selectToken(org.apache.hadoop.security.SecurityUtil.buildTokenService(
-                      HCatHadoopShims.Instance.get().getResourceManagerAddress(conf)), ugi.getTokens());
-          if(jtToken == null) {
-            //we don't need to cancel this token as the TokenRenewer for JT tokens
-            //takes care of cancelling them
-            job.getCredentials().addToken(new Text("hcat jt token"),
-                HCatUtil.getJobTrackerDelegationToken(conf,ugi.getUserName()));
-          }
-        }
-        
-        job.getCredentials().addToken(new Text(ugi.getUserName() +"_"+ tokenSignature), hiveToken);
-        // this will be used by the outputcommitter to pass on to the metastore client
-        // which in turn will pass on to the TokenSelector so that it can select
-        // the right token.
-        job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
-      }
-    }
-  }
-
-  // we should cancel hcat token if it was acquired by hcat
-  // and not if it was supplied (ie Oozie). In the latter
-  // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
-  void cancelToken(HiveMetaStoreClient client, JobContext context) throws IOException, MetaException {
-    String tokenStrForm = client.getTokenStrForm();
-    if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
-      try {
-        client.cancelDelegationToken(tokenStrForm);
-      } catch (TException e) {
-        String msg = "Failed to cancel delegation token";
-        LOG.error(msg,e);
-        throw new IOException(msg,e);
-      }
+    private static final Logger LOG = LoggerFactory.getLogger(HCatOutputFormat.class);
+
+    // making sure this is not initialized unless needed
+    private static final class LazyHolder {
+        public static final Security INSTANCE = new Security();
+    }
+
+    public static Security getInstance() {
+        return LazyHolder.INSTANCE;
+    }
+
+    boolean isSecurityEnabled() {
+        try {
+            Method m = UserGroupInformation.class.getMethod("isSecurityEnabled");
+            return (Boolean) m.invoke(null, (Object[]) null);
+        } catch (NoSuchMethodException e) {
+            LOG.info("Security is not supported by this version of hadoop.", e);
+        } catch (InvocationTargetException e) {
+            String msg = "Failed to call isSecurityEnabled()";
+            LOG.info(msg, e);
+            throw new IllegalStateException(msg, e);
+        } catch (IllegalAccessException e) {
+            String msg = "Failed to call isSecurityEnabled()";
+            LOG.info(msg, e);
+            throw new IllegalStateException(msg, e);
+        }
+        return false;
+    }
+
+    // a signature string to associate with a HCatTableInfo - essentially
+    // a concatenation of dbname, tablename and partition keyvalues.
+    String getTokenSignature(OutputJobInfo outputJobInfo) {
+        StringBuilder result = new StringBuilder("");
+        String dbName = outputJobInfo.getDatabaseName();
+        if (dbName != null) {
+            result.append(dbName);
+        }
+        String tableName = outputJobInfo.getTableName();
+        if (tableName != null) {
+            result.append("." + tableName);
+        }
+        Map<String, String> partValues = outputJobInfo.getPartitionValues();
+        if (partValues != null) {
+            for (Entry<String, String> entry : partValues.entrySet()) {
+                result.append("/");
+                result.append(entry.getKey());
+                result.append("=");
+                result.append(entry.getValue());
+            }
+
+        }
+        return result.toString();
+    }
+
+    void handleSecurity(
+        Job job,
+        OutputJobInfo outputJobInfo,
+        HiveMetaStoreClient client,
+        Configuration conf,
+        boolean harRequested)
+        throws IOException, MetaException, TException, Exception {
+        if (UserGroupInformation.isSecurityEnabled()) {
+            UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+            // check if oozie has set up a hcat deleg. token - if so use it
+            TokenSelector<? extends TokenIdentifier> hiveTokenSelector = new DelegationTokenSelector();
+            //Oozie does not change the service field of the token
+            //hence by default token generation will have a value of "new Text("")"
+            //HiveClient will look for a use TokenSelector.selectToken() with service
+            //set to empty "Text" if hive.metastore.token.signature property is set to null
+            Token<? extends TokenIdentifier> hiveToken = hiveTokenSelector.selectToken(
+                new Text(), ugi.getTokens());
+            if (hiveToken == null) {
+                // we did not get token set up by oozie, let's get them ourselves here.
+                // we essentially get a token per unique Output HCatTableInfo - this is
+                // done because through Pig, setOutput() method is called multiple times
+                // We want to only get the token once per unique output HCatTableInfo -
+                // we cannot just get one token since in multi-query case (> 1 store in 1 job)
+                // or the case when a single pig script results in > 1 jobs, the single
+                // token will get cancelled by the output committer and the subsequent
+                // stores will fail - by tying the token with the concatenation of
+                // dbname, tablename and partition keyvalues of the output
+                // TableInfo, we can have as many tokens as there are stores and the TokenSelector
+                // will correctly pick the right tokens which the committer will use and
+                // cancel.
+                String tokenSignature = getTokenSignature(outputJobInfo);
+                // get delegation tokens from hcat server and store them into the "job"
+                // These will be used in to publish partitions to
+                // hcat normally in OutputCommitter.commitJob()
+                // when the JobTracker in Hadoop MapReduce starts supporting renewal of
+                // arbitrary tokens, the renewer should be the principal of the JobTracker
+                hiveToken = HCatUtil.extractThriftToken(client.getDelegationToken(ugi.getUserName()), tokenSignature);
+
+                if (harRequested) {
+                    TokenSelector<? extends TokenIdentifier> jtTokenSelector =
+                        new org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSelector();
+                    Token jtToken = jtTokenSelector.selectToken(org.apache.hadoop.security.SecurityUtil.buildTokenService(
+                        HCatHadoopShims.Instance.get().getResourceManagerAddress(conf)), ugi.getTokens());
+                    if (jtToken == null) {
+                        //we don't need to cancel this token as the TokenRenewer for JT tokens
+                        //takes care of cancelling them
+                        job.getCredentials().addToken(new Text("hcat jt token"),
+                            HCatUtil.getJobTrackerDelegationToken(conf, ugi.getUserName()));
+                    }
+                }
+
+                job.getCredentials().addToken(new Text(ugi.getUserName() + "_" + tokenSignature), hiveToken);
+                // this will be used by the outputcommitter to pass on to the metastore client
+                // which in turn will pass on to the TokenSelector so that it can select
+                // the right token.
+                job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
+            }
+        }
+    }
+
+    // we should cancel hcat token if it was acquired by hcat
+    // and not if it was supplied (ie Oozie). In the latter
+    // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
+    void cancelToken(HiveMetaStoreClient client, JobContext context) throws IOException, MetaException {
+        String tokenStrForm = client.getTokenStrForm();
+        if (tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+            try {
+                client.cancelDelegationToken(tokenStrForm);
+            } catch (TException e) {
+                String msg = "Failed to cancel delegation token";
+                LOG.error(msg, e);
+                throw new IOException(msg, e);
+            }
+        }
     }
-  }
 
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java Mon Sep 10 23:28:55 2012
@@ -47,12 +47,12 @@ public class StorerInfo implements Seria
      * @param properties the properties for the storage handler
      */
     public StorerInfo(String ifClass, String ofClass, String serdeClass, String storageHandlerClass, Properties properties) {
-      super();
-      this.ifClass =ifClass;
-      this.ofClass = ofClass;
-      this.serdeClass = serdeClass;
-      this.storageHandlerClass = storageHandlerClass;
-      this.properties = properties;
+        super();
+        this.ifClass = ifClass;
+        this.ofClass = ofClass;
+        this.serdeClass = serdeClass;
+        this.storageHandlerClass = storageHandlerClass;
+        this.properties = properties;
     }
 
     /**
@@ -94,14 +94,14 @@ public class StorerInfo implements Seria
      * @return the storer properties
      */
     public Properties getProperties() {
-      return properties;
+        return properties;
     }
 
     /**
      * @param properties the storer properties to set 
      */
     public void setProperties(Properties properties) {
-      this.properties = properties;
+        this.properties = properties;
     }
 
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/oozie/JavaAction.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/oozie/JavaAction.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/oozie/JavaAction.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/oozie/JavaAction.java Mon Sep 10 23:28:55 2012
@@ -27,14 +27,14 @@ import org.apache.hcatalog.cli.SemanticA
 
 public class JavaAction {
 
-  public static void main(String[] args) throws Exception{
+    public static void main(String[] args) throws Exception {
 
-    HiveConf conf = new HiveConf();
-    conf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
-    conf.setVar(ConfVars.SEMANTIC_ANALYZER_HOOK, HCatSemanticAnalyzer.class.getName());
-    conf.setBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL, true);
-    SessionState.start(new CliSessionState(conf));
-    new CliDriver().processLine(args[0]);
-  }
+        HiveConf conf = new HiveConf();
+        conf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
+        conf.setVar(ConfVars.SEMANTIC_ANALYZER_HOOK, HCatSemanticAnalyzer.class.getName());
+        conf.setBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL, true);
+        SessionState.start(new CliSessionState(conf));
+        new CliDriver().processLine(args[0]);
+    }
 
 }