You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC

svn commit: r1520466 [7/18] - in /hive/trunk/hcatalog: core/src/main/java/org/apache/hcatalog/cli/ core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/ core/src/main/java/org/apache/hcatalog/common/ core/src/main/java/org/apache/hcatalog/data/ ...

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Class which handles querying the metadata server using the MetaStoreClient. The list of
+ * partitions matching the partition filter is fetched from the server and the information is
+ * serialized and written into the JobContext configuration. The inputInfo is also updated with
+ * info required in the client process context.
+ */
+class InitializeInput {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InitializeInput.class);
+
+    /**
+     * @see org.apache.hcatalog.mapreduce.InitializeInput#setInput(org.apache.hadoop.conf.Configuration, InputJobInfo)
+     */
+    public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exception {
+        setInput(job.getConfiguration(), theirInputJobInfo);
+    }
+
+    /**
+     * Set the input to use for the Job. This queries the metadata server with the specified
+     * partition predicates, gets the matching partitions, and puts the information in the job
+     * configuration object.
+     *
+     * To ensure a known InputJobInfo state, only the database name, table name, filter, and
+     * properties are preserved. All other modification from the given InputJobInfo are discarded.
+     *
+     * After calling setInput, InputJobInfo can be retrieved from the job configuration as follows:
+     * {code}
+     * InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(
+     *     job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
+     * {code}
+     *
+     * @param conf the job Configuration object
+     * @param theirInputJobInfo information on the Input to read
+     * @throws Exception
+     */
+    public static void setInput(Configuration conf,
+                                InputJobInfo theirInputJobInfo) throws Exception {
+        InputJobInfo inputJobInfo = InputJobInfo.create(
+            theirInputJobInfo.getDatabaseName(),
+            theirInputJobInfo.getTableName(),
+            theirInputJobInfo.getFilter(),
+            theirInputJobInfo.getProperties());
+        conf.set(
+            HCatConstants.HCAT_KEY_JOB_INFO,
+            HCatUtil.serialize(getInputJobInfo(conf, inputJobInfo, null)));
+    }
+
+    /**
+     * Returns the given InputJobInfo after populating with data queried from the metadata service.
+     */
+    private static InputJobInfo getInputJobInfo(
+        Configuration conf, InputJobInfo inputJobInfo, String locationFilter) throws Exception {
+        HiveMetaStoreClient client = null;
+        HiveConf hiveConf = null;
+        try {
+            if (conf != null) {
+                hiveConf = HCatUtil.getHiveConf(conf);
+            } else {
+                hiveConf = new HiveConf(HCatInputFormat.class);
+            }
+            client = HCatUtil.getHiveClient(hiveConf);
+            Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
+                inputJobInfo.getTableName());
+
+            List<PartInfo> partInfoList = new ArrayList<PartInfo>();
+
+            inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
+            if (table.getPartitionKeys().size() != 0) {
+                //Partitioned table
+                List<Partition> parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(),
+                    inputJobInfo.getTableName(),
+                    inputJobInfo.getFilter(),
+                    (short) -1);
+
+                // Default to 100,000 partitions if hive.metastore.maxpartition is not defined
+                int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
+                if (parts != null && parts.size() > maxPart) {
+                    throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, "total number of partitions is " + parts.size());
+                }
+
+                // populate partition info
+                for (Partition ptn : parts) {
+                    HCatSchema schema = HCatUtil.extractSchema(
+                        new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
+                    PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
+                        ptn.getParameters(), conf, inputJobInfo);
+                    partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table, ptn));
+                    partInfoList.add(partInfo);
+                }
+
+            } else {
+                //Non partitioned table
+                HCatSchema schema = HCatUtil.extractSchema(table);
+                PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
+                    table.getParameters(), conf, inputJobInfo);
+                partInfo.setPartitionValues(new HashMap<String, String>());
+                partInfoList.add(partInfo);
+            }
+            inputJobInfo.setPartitions(partInfoList);
+
+            return inputJobInfo;
+        } finally {
+            HCatUtil.closeHiveClientQuietly(client);
+        }
+
+    }
+
+    private static PartInfo extractPartInfo(HCatSchema schema, StorageDescriptor sd,
+                                            Map<String, String> parameters, Configuration conf,
+                                            InputJobInfo inputJobInfo) throws IOException {
+
+        StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd, parameters);
+
+        Properties hcatProperties = new Properties();
+        HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, storerInfo);
+
+        // copy the properties from storageHandler to jobProperties
+        Map<String, String> jobProperties = HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
+
+        for (String key : parameters.keySet()) {
+            hcatProperties.put(key, parameters.get(key));
+        }
+        // FIXME
+        // Bloating partinfo with inputJobInfo is not good
+        return new PartInfo(schema, storageHandler, sd.getLocation(),
+            hcatProperties, jobProperties, inputJobInfo.getTableInfo());
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * Container for metadata read from the metadata server.
+ * Prior to release 0.5, InputJobInfo was a key part of the public API, exposed directly
+ * to end-users as an argument to
+ * {@link HCatInputFormat#setInput(org.apache.hadoop.mapreduce.Job, InputJobInfo)}.
+ * Going forward, we plan on treating InputJobInfo as an implementation detail and no longer
+ * expose to end-users. Should you have a need to use InputJobInfo outside HCatalog itself,
+ * please contact the developer mailing list before depending on this class.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class InputJobInfo implements Serializable {
+
+    /** The serialization version */
+    private static final long serialVersionUID = 1L;
+
+    /** The db and table names. */
+    private final String databaseName;
+    private final String tableName;
+
+    /** meta information of the table to be read from */
+    private HCatTableInfo tableInfo;
+
+    /** The partition filter */
+    private String filter;
+
+    /** The list of partitions matching the filter. */
+    transient private List<PartInfo> partitions;
+
+    /** implementation specific job properties */
+    private Properties properties;
+
+    /**
+     * Initializes a new InputJobInfo
+     * for reading data from a table.
+     * @param databaseName the db name
+     * @param tableName the table name
+     * @param filter the partition filter
+     * @param properties implementation specific job properties
+     */
+    public static InputJobInfo create(String databaseName,
+                                      String tableName,
+                                      String filter,
+                                      Properties properties) {
+        return new InputJobInfo(databaseName, tableName, filter, properties);
+    }
+
+    /**
+     * Initializes a new InputJobInfo
+     * for reading data from a table.
+     * @param databaseName the db name
+     * @param tableName the table name
+     * @param filter the partition filter
+     */
+    @Deprecated
+    public static InputJobInfo create(String databaseName,
+                                      String tableName,
+                                      String filter) {
+        return create(databaseName, tableName, filter, null);
+    }
+
+
+    private InputJobInfo(String databaseName,
+                         String tableName,
+                         String filter,
+                         Properties properties) {
+        this.databaseName = (databaseName == null) ?
+            MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
+        this.tableName = tableName;
+        this.filter = filter;
+        this.properties = properties == null ? new Properties() : properties;
+    }
+
+    /**
+     * Gets the value of databaseName
+     * @return the databaseName
+     */
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    /**
+     * Gets the value of tableName
+     * @return the tableName
+     */
+    public String getTableName() {
+        return tableName;
+    }
+
+    /**
+     * Gets the table's meta information
+     * @return the HCatTableInfo
+     */
+    public HCatTableInfo getTableInfo() {
+        return tableInfo;
+    }
+
+    /**
+     * set the tablInfo instance
+     * this should be the same instance
+     * determined by this object's DatabaseName and TableName
+     * @param tableInfo
+     */
+    void setTableInfo(HCatTableInfo tableInfo) {
+        this.tableInfo = tableInfo;
+    }
+
+    /**
+     * Gets the value of partition filter
+     * @return the filter string
+     */
+    public String getFilter() {
+        return filter;
+    }
+
+    /**
+     * @return partition info
+     */
+    public List<PartInfo> getPartitions() {
+        return partitions;
+    }
+
+    /**
+     * @return partition info  list
+     */
+    void setPartitions(List<PartInfo> partitions) {
+        this.partitions = partitions;
+    }
+
+    /**
+     * Set/Get Property information to be passed down to *StorageHandler implementation
+     * put implementation specific storage handler configurations here
+     * @return the implementation specific job properties
+     */
+    public Properties getProperties() {
+        return properties;
+    }
+
+    /**
+     * Serialize this object, compressing the partitions which can exceed the
+     * allowed jobConf size.
+     * @see <a href="https://issues.apache.org/jira/browse/HCATALOG-453">HCATALOG-453</a>
+     */
+    private void writeObject(ObjectOutputStream oos)
+        throws IOException {
+        oos.defaultWriteObject();
+        Deflater def = new Deflater(Deflater.BEST_COMPRESSION);
+        ObjectOutputStream partInfoWriter =
+            new ObjectOutputStream(new DeflaterOutputStream(oos, def));
+        partInfoWriter.writeObject(partitions);
+        partInfoWriter.close();
+    }
+
+    /**
+     * Deserialize this object, decompressing the partitions which can exceed the
+     * allowed jobConf size.
+     * @see <a href="https://issues.apache.org/jira/browse/HCATALOG-453">HCATALOG-453</a>
+     */
+    @SuppressWarnings("unchecked")
+    private void readObject(ObjectInputStream ois)
+        throws IOException, ClassNotFoundException {
+        ois.defaultReadObject();
+        ObjectInputStream partInfoReader =
+            new ObjectInputStream(new InflaterInputStream(ois));
+        partitions = (List<PartInfo>)partInfoReader.readObject();
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InternalUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InternalUtil.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InternalUtil.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+class InternalUtil {
+    private static final Logger LOG = LoggerFactory.getLogger(InternalUtil.class);
+
+    static StorerInfo extractStorerInfo(StorageDescriptor sd, Map<String, String> properties) throws IOException {
+        Properties hcatProperties = new Properties();
+        for (String key : properties.keySet()) {
+            hcatProperties.put(key, properties.get(key));
+        }
+
+        // also populate with StorageDescriptor->SerDe.Parameters
+        for (Map.Entry<String, String> param :
+            sd.getSerdeInfo().getParameters().entrySet()) {
+            hcatProperties.put(param.getKey(), param.getValue());
+        }
+
+
+        return new StorerInfo(
+            sd.getInputFormat(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(),
+            properties.get(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE),
+            hcatProperties);
+    }
+
+    static StructObjectInspector createStructObjectInspector(HCatSchema outputSchema) throws IOException {
+
+        if (outputSchema == null) {
+            throw new IOException("Invalid output schema specified");
+        }
+
+        List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
+        List<String> fieldNames = new ArrayList<String>();
+
+        for (HCatFieldSchema hcatFieldSchema : outputSchema.getFields()) {
+            TypeInfo type = TypeInfoUtils.getTypeInfoFromTypeString(hcatFieldSchema.getTypeString());
+
+            fieldNames.add(hcatFieldSchema.getName());
+            fieldInspectors.add(getObjectInspector(type));
+        }
+
+        StructObjectInspector structInspector = ObjectInspectorFactory.
+            getStandardStructObjectInspector(fieldNames, fieldInspectors);
+        return structInspector;
+    }
+
+    private static ObjectInspector getObjectInspector(TypeInfo type) throws IOException {
+
+        switch (type.getCategory()) {
+
+        case PRIMITIVE:
+            PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type;
+            return PrimitiveObjectInspectorFactory.
+                getPrimitiveJavaObjectInspector(primitiveType.getPrimitiveCategory());
+
+        case MAP:
+            MapTypeInfo mapType = (MapTypeInfo) type;
+            MapObjectInspector mapInspector = ObjectInspectorFactory.getStandardMapObjectInspector(
+                getObjectInspector(mapType.getMapKeyTypeInfo()), getObjectInspector(mapType.getMapValueTypeInfo()));
+            return mapInspector;
+
+        case LIST:
+            ListTypeInfo listType = (ListTypeInfo) type;
+            ListObjectInspector listInspector = ObjectInspectorFactory.getStandardListObjectInspector(
+                getObjectInspector(listType.getListElementTypeInfo()));
+            return listInspector;
+
+        case STRUCT:
+            StructTypeInfo structType = (StructTypeInfo) type;
+            List<TypeInfo> fieldTypes = structType.getAllStructFieldTypeInfos();
+
+            List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
+            for (TypeInfo fieldType : fieldTypes) {
+                fieldInspectors.add(getObjectInspector(fieldType));
+            }
+
+            StructObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+                structType.getAllStructFieldNames(), fieldInspectors);
+            return structInspector;
+
+        default:
+            throw new IOException("Unknown field schema type");
+        }
+    }
+
+    //TODO this has to find a better home, it's also hardcoded as default in hive would be nice
+    // if the default was decided by the serde
+    static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo)
+        throws SerDeException {
+        serDe.initialize(conf, getSerdeProperties(jobInfo.getTableInfo(), jobInfo.getOutputSchema()));
+    }
+
+    static void initializeDeserializer(Deserializer deserializer, Configuration conf,
+                                       HCatTableInfo info, HCatSchema schema) throws SerDeException {
+        Properties props = getSerdeProperties(info, schema);
+        LOG.info("Initializing " + deserializer.getClass().getName() + " with properties " + props);
+        deserializer.initialize(conf, props);
+    }
+
+    private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s)
+        throws SerDeException {
+        Properties props = new Properties();
+        List<FieldSchema> fields = HCatUtil.getFieldSchemaList(s.getFields());
+        props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS,
+            MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
+        props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES,
+            MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
+
+        // setting these props to match LazySimpleSerde
+        props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT, "\\N");
+        props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT, "1");
+
+        //add props from params set in table schema
+        props.putAll(info.getStorerInfo().getProperties());
+
+        return props;
+    }
+
+    static Reporter createReporter(TaskAttemptContext context) {
+        return new ProgressReporter(context);
+    }
+
+    /**
+     * Casts an InputSplit into a HCatSplit, providing a useful error message if the cast fails.
+     * @param split the InputSplit
+     * @return the HCatSplit
+     * @throws IOException
+     */
+    public static HCatSplit castToHCatSplit(InputSplit split) throws IOException {
+        if (split instanceof HCatSplit) {
+            return (HCatSplit) split;
+        } else {
+            throw new IOException("Split must be " + HCatSplit.class.getName()
+                + " but found " + split.getClass().getName());
+        }
+    }
+
+
+    static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn)
+        throws IOException {
+        List<String> values = ptn.getValues();
+        if (values.size() != table.getPartitionKeys().size()) {
+            throw new IOException(
+                    "Partition values in partition inconsistent with table definition, table "
+                            + table.getTableName() + " has "
+                            + table.getPartitionKeys().size()
+                            + " partition keys, partition has " + values.size()
+                            + "partition values");
+        }
+
+        Map<String, String> ptnKeyValues = new HashMap<String, String>();
+
+        int i = 0;
+        for (FieldSchema schema : table.getPartitionKeys()) {
+            // CONCERN : the way this mapping goes, the order *needs* to be
+            // preserved for table.getPartitionKeys() and ptn.getValues()
+            ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
+            i++;
+        }
+
+        return ptnKeyValues;
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,618 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The MultiOutputFormat class simplifies writing output data to multiple
+ * outputs.
+ * <p>
+ * Multiple output formats can be defined each with its own
+ * <code>OutputFormat</code> class, own key class and own value class. Any
+ * configuration on these output format classes can be done without interfering
+ * with other output format's configuration.
+ * <p>
+ * Usage pattern for job submission:
+ *
+ * <pre>
+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ *
+ * job.setMapperClass(WordCountMap.class);
+ * job.setReducerClass(WordCountReduce.class);
+ * job.setInputFormatClass(TextInputFormat.class);
+ * job.setOutputFormatClass(MultiOutputFormat.class);
+ * // Need not define OutputKeyClass and OutputValueClass. They default to
+ * // Writable.class
+ * job.setMapOutputKeyClass(Text.class);
+ * job.setMapOutputValueClass(IntWritable.class);
+ *
+ *
+ * // Create a JobConfigurer that will configure the job with the multiple
+ * // output format information.
+ * JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+ *
+ * // Defines additional single text based output 'text' for the job.
+ * // Any configuration for the defined OutputFormat should be done with
+ * // the Job obtained with configurer.getJob() method.
+ * configurer.addOutputFormat("text", TextOutputFormat.class,
+ *                 IntWritable.class, Text.class);
+ * FileOutputFormat.setOutputPath(configurer.getJob("text"), textOutDir);
+ *
+ * // Defines additional sequence-file based output 'sequence' for the job
+ * configurer.addOutputFormat("sequence", SequenceFileOutputFormat.class,
+ *                 Text.class, IntWritable.class);
+ * FileOutputFormat.setOutputPath(configurer.getJob("sequence"), seqOutDir);
+ * ...
+ * // configure method to be called on the JobConfigurer once all the
+ * // output formats have been defined and configured.
+ * configurer.configure();
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ * <p>
+ * Usage in Reducer:
+ *
+ * <pre>
+ * public class WordCountReduce extends
+ *         Reducer&lt;Text, IntWritable, Writable, Writable&gt; {
+ *
+ *     private IntWritable count = new IntWritable();
+ *
+ *     public void reduce(Text word, Iterator&lt;IntWritable&gt; values,
+ *             Context context)
+ *             throws IOException {
+ *         int sum = 0;
+ *         for (IntWritable val : values) {
+ *             sum += val.get();
+ *         }
+ *         count.set(sum);
+ *         MultiOutputFormat.write(&quot;text&quot;, count, word, context);
+ *         MultiOutputFormat.write(&quot;sequence&quot;, word, count, context);
+ *     }
+ *
+ * }
+ *
+ * </pre>
+ *
+ * Map only jobs:
+ * <p>
+ * MultiOutputFormat.write("output", key, value, context); can be called similar
+ * to a reducer in map only jobs.
+ *
+ */
+public class MultiOutputFormat extends OutputFormat<Writable, Writable> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MultiOutputFormat.class.getName());
+    private static final String MO_ALIASES = "mapreduce.multiout.aliases";
+    private static final String MO_ALIAS = "mapreduce.multiout.alias";
+    private static final String CONF_KEY_DELIM = "%%";
+    private static final String CONF_VALUE_DELIM = ";;";
+    private static final String COMMA_DELIM = ",";
+    private static final List<String> configsToOverride = new ArrayList<String>();
+    private static final Map<String, String> configsToMerge = new HashMap<String, String>();
+
+    static {
+        configsToOverride.add("mapred.output.dir");
+        configsToOverride.add(HCatHadoopShims.Instance.get().getPropertyName(HCatHadoopShims.PropertyName.CACHE_SYMLINK));
+        configsToMerge.put(JobContext.JOB_NAMENODES, COMMA_DELIM);
+        configsToMerge.put("tmpfiles", COMMA_DELIM);
+        configsToMerge.put("tmpjars", COMMA_DELIM);
+        configsToMerge.put("tmparchives", COMMA_DELIM);
+        configsToMerge.put(HCatHadoopShims.Instance.get().getPropertyName(HCatHadoopShims.PropertyName.CACHE_ARCHIVES), COMMA_DELIM);
+        configsToMerge.put(HCatHadoopShims.Instance.get().getPropertyName(HCatHadoopShims.PropertyName.CACHE_FILES), COMMA_DELIM);
+        String fileSep;
+        if (HCatUtil.isHadoop23()) {
+            fileSep = ",";
+        } else {
+            fileSep = System.getProperty("path.separator");
+        }
+        configsToMerge.put("mapred.job.classpath.archives", fileSep);
+        configsToMerge.put("mapred.job.classpath.files", fileSep);
+    }
+
+    /**
+     * Get a JobConfigurer instance that will support configuration of the job
+     * for multiple output formats.
+     *
+     * @param job the mapreduce job to be submitted
+     * @return JobConfigurer
+     */
+    public static JobConfigurer createConfigurer(Job job) {
+        return JobConfigurer.create(job);
+    }
+
+    /**
+     * Get the JobContext with the related OutputFormat configuration populated given the alias
+     * and the actual JobContext
+     * @param alias the name given to the OutputFormat configuration
+     * @param context the JobContext
+     * @return a copy of the JobContext with the alias configuration populated
+     */
+    public static JobContext getJobContext(String alias, JobContext context) {
+        String aliasConf = context.getConfiguration().get(getAliasConfName(alias));
+        JobContext aliasContext = HCatHadoopShims.Instance.get().createJobContext(context.getConfiguration(), context.getJobID());
+        addToConfig(aliasConf, aliasContext.getConfiguration());
+        return aliasContext;
+    }
+
+    /**
+     * Get the TaskAttemptContext with the related OutputFormat configuration populated given the alias
+     * and the actual TaskAttemptContext
+     * @param alias the name given to the OutputFormat configuration
+     * @param context the Mapper or Reducer Context
+     * @return a copy of the TaskAttemptContext with the alias configuration populated
+     */
+    public static TaskAttemptContext getTaskAttemptContext(String alias, TaskAttemptContext context) {
+        String aliasConf = context.getConfiguration().get(getAliasConfName(alias));
+        TaskAttemptContext aliasContext = HCatHadoopShims.Instance.get().createTaskAttemptContext(
+                context.getConfiguration(), context.getTaskAttemptID());
+        addToConfig(aliasConf, aliasContext.getConfiguration());
+        return aliasContext;
+    }
+
+    /**
+     * Write the output key and value using the OutputFormat defined by the
+     * alias.
+     *
+     * @param alias the name given to the OutputFormat configuration
+     * @param key the output key to be written
+     * @param value the output value to be written
+     * @param context the Mapper or Reducer Context
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public static <K, V> void write(String alias, K key, V value, TaskInputOutputContext context)
+        throws IOException, InterruptedException {
+        KeyValue<K, V> keyval = new KeyValue<K, V>(key, value);
+        context.write(new Text(alias), keyval);
+    }
+
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+        for (String alias : getOutputFormatAliases(context)) {
+            LOGGER.debug("Calling checkOutputSpecs for alias: " + alias);
+            JobContext aliasContext = getJobContext(alias, context);
+            OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext);
+            outputFormat.checkOutputSpecs(aliasContext);
+            // Copy credentials and any new config added back to JobContext
+            context.getCredentials().addAll(aliasContext.getCredentials());
+            setAliasConf(alias, context, aliasContext);
+        }
+    }
+
+    @Override
+    public RecordWriter<Writable, Writable> getRecordWriter(TaskAttemptContext context)
+        throws IOException,
+        InterruptedException {
+        return new MultiRecordWriter(context);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+        return new MultiOutputCommitter(context);
+    }
+
+    private static OutputFormat<?, ?> getOutputFormatInstance(JobContext context) {
+        OutputFormat<?, ?> outputFormat;
+        try {
+            outputFormat = ReflectionUtils.newInstance(context.getOutputFormatClass(),
+                    context.getConfiguration());
+        } catch (ClassNotFoundException e) {
+            throw new IllegalStateException(e);
+        }
+        return outputFormat;
+    }
+
+    private static String[] getOutputFormatAliases(JobContext context) {
+        return context.getConfiguration().getStrings(MO_ALIASES);
+    }
+
+    /**
+     * Compare the aliasContext with userJob and add the differing configuration
+     * as mapreduce.multiout.alias.<aliasname>.conf to the userJob.
+     * <p>
+     * Merge config like tmpjars, tmpfile, tmparchives,
+     * mapreduce.job.hdfs-servers that are directly handled by JobClient and add
+     * them to userJob.
+     * <p>
+     * Add mapred.output.dir config to userJob.
+     *
+     * @param alias alias name associated with a OutputFormat
+     * @param userJob reference to Job that the user is going to submit
+     * @param aliasContext JobContext populated with OutputFormat related
+     *            configuration.
+     */
+    private static void setAliasConf(String alias, JobContext userJob, JobContext aliasContext) {
+        Configuration userConf = userJob.getConfiguration();
+        StringBuilder builder = new StringBuilder();
+        for (Entry<String, String> conf : aliasContext.getConfiguration()) {
+            String key = conf.getKey();
+            String value = conf.getValue();
+            String jobValue = userConf.getRaw(key);
+            if (jobValue == null || !jobValue.equals(value)) {
+                if (configsToMerge.containsKey(key)) {
+                    String mergedValue = getMergedConfValue(jobValue, value, configsToMerge.get(key));
+                    userConf.set(key, mergedValue);
+                } else {
+                    if (configsToOverride.contains(key)) {
+                        userConf.set(key, value);
+                    }
+                    builder.append(key).append(CONF_KEY_DELIM).append(value)
+                            .append(CONF_VALUE_DELIM);
+                }
+            }
+        }
+        if (builder.length() > CONF_VALUE_DELIM.length()) {
+            builder.delete(builder.length() - CONF_VALUE_DELIM.length(), builder.length());
+            userConf.set(getAliasConfName(alias), builder.toString());
+        }
+    }
+
+    private static String getMergedConfValue(String originalValues, String newValues, String separator) {
+        if (originalValues == null) {
+            return newValues;
+        }
+        Set<String> mergedValues = new LinkedHashSet<String>();
+        mergedValues.addAll(Arrays.asList(StringUtils.split(originalValues, separator)));
+        mergedValues.addAll(Arrays.asList(StringUtils.split(newValues, separator)));
+        StringBuilder builder = new StringBuilder(originalValues.length() + newValues.length() + 2);
+        for (String value : mergedValues) {
+            builder.append(value).append(separator);
+        }
+        return builder.substring(0, builder.length() - separator.length());
+    }
+
+    private static String getAliasConfName(String alias) {
+        return MO_ALIAS + "." + alias + ".conf";
+    }
+
+    private static void addToConfig(String aliasConf, Configuration conf) {
+        String[] config = aliasConf.split(CONF_KEY_DELIM + "|" + CONF_VALUE_DELIM);
+        for (int i = 0; i < config.length; i += 2) {
+            conf.set(config[i], config[i + 1]);
+        }
+    }
+
+    /**
+     * Class that supports configuration of the job for multiple output formats.
+     */
+    public static class JobConfigurer {
+
+        private final Job job;
+        private Map<String, Job> outputConfigs = new LinkedHashMap<String, Job>();
+
+        private JobConfigurer(Job job) {
+            this.job = job;
+        }
+
+        private static JobConfigurer create(Job job) {
+            JobConfigurer configurer = new JobConfigurer(job);
+            return configurer;
+        }
+
+        /**
+         * Add a OutputFormat configuration to the Job with a alias name.
+         *
+         * @param alias the name to be given to the OutputFormat configuration
+         * @param outputFormatClass OutputFormat class
+         * @param keyClass the key class for the output data
+         * @param valueClass the value class for the output data
+         * @throws IOException
+         */
+        public void addOutputFormat(String alias,
+                Class<? extends OutputFormat> outputFormatClass,
+                Class<?> keyClass, Class<?> valueClass) throws IOException {
+            Job copy = new Job(this.job.getConfiguration());
+            outputConfigs.put(alias, copy);
+            copy.setOutputFormatClass(outputFormatClass);
+            copy.setOutputKeyClass(keyClass);
+            copy.setOutputValueClass(valueClass);
+        }
+
+        /**
+         * Get the Job configuration for a OutputFormat defined by the alias
+         * name. The job returned by this method should be passed to the
+         * OutputFormat for any configuration instead of the Job that will be
+         * submitted to the JobClient.
+         *
+         * @param alias the name used for the OutputFormat during
+         *            addOutputFormat
+         * @return Job
+         */
+        public Job getJob(String alias) {
+            Job copy = outputConfigs.get(alias);
+            if (copy == null) {
+                throw new IllegalArgumentException("OutputFormat with alias " + alias
+                        + " has not beed added");
+            }
+            return copy;
+        }
+
+        /**
+         * Configure the job with the multiple output formats added. This method
+         * should be called after all the output formats have been added and
+         * configured and before the job submission.
+         */
+        public void configure() {
+            StringBuilder aliases = new StringBuilder();
+            Configuration jobConf = job.getConfiguration();
+            for (Entry<String, Job> entry : outputConfigs.entrySet()) {
+                // Copy credentials
+                job.getCredentials().addAll(entry.getValue().getCredentials());
+                String alias = entry.getKey();
+                aliases.append(alias).append(COMMA_DELIM);
+                // Store the differing configuration for each alias in the job
+                // as a setting.
+                setAliasConf(alias, job, entry.getValue());
+            }
+            aliases.delete(aliases.length() - COMMA_DELIM.length(), aliases.length());
+            jobConf.set(MO_ALIASES, aliases.toString());
+        }
+
+    }
+
+    private static class KeyValue<K, V> implements Writable {
+        private final K key;
+        private final V value;
+
+        public KeyValue(K key, V value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public K getKey() {
+            return key;
+        }
+
+        public V getValue() {
+            return value;
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            // Ignore. Not required as this will be never
+            // serialized/deserialized.
+        }
+
+        @Override
+        public void readFields(DataInput in) throws IOException {
+            // Ignore. Not required as this will be never
+            // serialized/deserialized.
+        }
+    }
+
+    private static class MultiRecordWriter extends RecordWriter<Writable, Writable> {
+
+        private final Map<String, BaseRecordWriterContainer> baseRecordWriters;
+
+        public MultiRecordWriter(TaskAttemptContext context) throws IOException,
+                InterruptedException {
+            baseRecordWriters = new LinkedHashMap<String, BaseRecordWriterContainer>();
+            String[] aliases = getOutputFormatAliases(context);
+            for (String alias : aliases) {
+                LOGGER.info("Creating record writer for alias: " + alias);
+                TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context);
+                Configuration aliasConf = aliasContext.getConfiguration();
+                // Create output directory if not already created.
+                String outDir = aliasConf.get("mapred.output.dir");
+                if (outDir != null) {
+                    Path outputDir = new Path(outDir);
+                    FileSystem fs = outputDir.getFileSystem(aliasConf);
+                    if (!fs.exists(outputDir)) {
+                        fs.mkdirs(outputDir);
+                    }
+                }
+                OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext);
+                baseRecordWriters.put(alias,
+                        new BaseRecordWriterContainer(outputFormat.getRecordWriter(aliasContext),
+                                aliasContext));
+            }
+        }
+
+        @Override
+        public void write(Writable key, Writable value) throws IOException, InterruptedException {
+            Text _key = (Text) key;
+            KeyValue _value = (KeyValue) value;
+            String alias = new String(_key.getBytes(), 0, _key.getLength());
+            BaseRecordWriterContainer baseRWContainer = baseRecordWriters.get(alias);
+            if (baseRWContainer == null) {
+                throw new IllegalArgumentException("OutputFormat with alias " + alias
+                        + " has not been added");
+            }
+            baseRWContainer.getRecordWriter().write(_value.getKey(), _value.getValue());
+        }
+
+        @Override
+        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+            for (Entry<String, BaseRecordWriterContainer> entry : baseRecordWriters.entrySet()) {
+                BaseRecordWriterContainer baseRWContainer = entry.getValue();
+                LOGGER.info("Closing record writer for alias: " + entry.getKey());
+                baseRWContainer.getRecordWriter().close(baseRWContainer.getContext());
+            }
+        }
+
+    }
+
+    private static class BaseRecordWriterContainer {
+
+        private final RecordWriter recordWriter;
+        private final TaskAttemptContext context;
+
+        public BaseRecordWriterContainer(RecordWriter recordWriter, TaskAttemptContext context) {
+            this.recordWriter = recordWriter;
+            this.context = context;
+        }
+
+        public RecordWriter getRecordWriter() {
+            return recordWriter;
+        }
+
+        public TaskAttemptContext getContext() {
+            return context;
+        }
+    }
+
+    public class MultiOutputCommitter extends OutputCommitter {
+
+        private final Map<String, BaseOutputCommitterContainer> outputCommitters;
+
+        public MultiOutputCommitter(TaskAttemptContext context) throws IOException,
+                InterruptedException {
+            outputCommitters = new LinkedHashMap<String, MultiOutputFormat.BaseOutputCommitterContainer>();
+            String[] aliases = getOutputFormatAliases(context);
+            for (String alias : aliases) {
+                LOGGER.info("Creating output committer for alias: " + alias);
+                TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context);
+                OutputCommitter baseCommitter = getOutputFormatInstance(aliasContext)
+                        .getOutputCommitter(aliasContext);
+                outputCommitters.put(alias,
+                        new BaseOutputCommitterContainer(baseCommitter, aliasContext));
+            }
+        }
+
+        @Override
+        public void setupJob(JobContext jobContext) throws IOException {
+            for (String alias : outputCommitters.keySet()) {
+                LOGGER.info("Calling setupJob for alias: " + alias);
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                outputContainer.getBaseCommitter().setupJob(outputContainer.getContext());
+            }
+        }
+
+        @Override
+        public void setupTask(TaskAttemptContext taskContext) throws IOException {
+            for (String alias : outputCommitters.keySet()) {
+                LOGGER.info("Calling setupTask for alias: " + alias);
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                outputContainer.getBaseCommitter().setupTask(outputContainer.getContext());
+            }
+        }
+
+        @Override
+        public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+            boolean needTaskCommit = false;
+            for (String alias : outputCommitters.keySet()) {
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                needTaskCommit = needTaskCommit
+                        || outputContainer.getBaseCommitter().needsTaskCommit(
+                                outputContainer.getContext());
+            }
+            return needTaskCommit;
+        }
+
+        @Override
+        public void commitTask(TaskAttemptContext taskContext) throws IOException {
+            for (String alias : outputCommitters.keySet()) {
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                OutputCommitter baseCommitter = outputContainer.getBaseCommitter();
+                TaskAttemptContext committerContext = outputContainer.getContext();
+                if (baseCommitter.needsTaskCommit(committerContext)) {
+                    LOGGER.info("Calling commitTask for alias: " + alias);
+                    baseCommitter.commitTask(committerContext);
+                }
+            }
+        }
+
+        @Override
+        public void abortTask(TaskAttemptContext taskContext) throws IOException {
+            for (String alias : outputCommitters.keySet()) {
+                LOGGER.info("Calling abortTask for alias: " + alias);
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                outputContainer.getBaseCommitter().abortTask(outputContainer.getContext());
+            }
+        }
+
+        @Override
+        public void commitJob(JobContext jobContext) throws IOException {
+            for (String alias : outputCommitters.keySet()) {
+                LOGGER.info("Calling commitJob for alias: " + alias);
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                outputContainer.getBaseCommitter().commitJob(outputContainer.getContext());
+            }
+        }
+
+        @Override
+        public void abortJob(JobContext jobContext, State state) throws IOException {
+            for (String alias : outputCommitters.keySet()) {
+                LOGGER.info("Calling abortJob for alias: " + alias);
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                outputContainer.getBaseCommitter().abortJob(outputContainer.getContext(), state);
+            }
+        }
+    }
+
+    private static class BaseOutputCommitterContainer {
+
+        private final OutputCommitter outputCommitter;
+        private final TaskAttemptContext context;
+
+        public BaseOutputCommitterContainer(OutputCommitter outputCommitter,
+                TaskAttemptContext context) {
+            this.outputCommitter = outputCommitter;
+            this.context = context;
+        }
+
+        public OutputCommitter getBaseCommitter() {
+            return outputCommitter;
+        }
+
+        public TaskAttemptContext getContext() {
+            return context;
+        }
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+
+/**
+ *  This class will contain an implementation of an OutputCommitter.
+ *  See {@link OutputFormatContainer} for more information about containers.
+ */
+abstract class OutputCommitterContainer extends OutputCommitter {
+    private final org.apache.hadoop.mapred.OutputCommitter committer;
+
+    /**
+     * @param context current JobContext
+     * @param committer OutputCommitter that this instance will contain
+     */
+    public OutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter committer) {
+        this.committer = committer;
+    }
+
+    /**
+     * @return underlying OutputCommitter
+     */
+    public OutputCommitter getBaseOutputCommitter() {
+        return committer;
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hcatalog.data.HCatRecord;
+
+/**
+ *  This container class is used to wrap OutputFormat implementations and augment them with
+ *  behavior necessary to work with HCatalog (ie metastore updates, hcatalog delegation tokens, etc).
+ *  Containers are also used to provide storage specific implementations of some HCatalog features (ie dynamic partitioning).
+ *  Hence users wishing to create storage specific implementations of HCatalog features should implement this class and override
+ *  HCatStorageHandler.getOutputFormatContainer(OutputFormat outputFormat) to return the implementation.
+ *  By default DefaultOutputFormatContainer is used, which only implements the bare minimum features HCatalog features
+ *  such as partitioning isn't supported.
+ */
+abstract class OutputFormatContainer extends OutputFormat<WritableComparable<?>, HCatRecord> {
+    private org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>, ? super Writable> of;
+
+    /**
+     * @param of OutputFormat this instance will contain
+     */
+    public OutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>, ? super Writable> of) {
+        this.of = of;
+    }
+
+    /**
+     * @return underlying OutputFormat
+     */
+    public org.apache.hadoop.mapred.OutputFormat getBaseOutputFormat() {
+        return of;
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The class used to serialize and store the output related information  */
+public class OutputJobInfo implements Serializable {
+
+    /** The db and table names. */
+    private final String databaseName;
+    private final String tableName;
+
+    /** The serialization version. */
+    private static final long serialVersionUID = 1L;
+
+    /** The table info provided by user. */
+    private HCatTableInfo tableInfo;
+
+    /** The output schema. This is given to us by user.  This wont contain any
+     * partition columns ,even if user has specified them.
+     * */
+    private HCatSchema outputSchema;
+
+    /** The location of the partition being written */
+    private String location;
+
+    /** The partition values to publish to, if used for output*/
+    private Map<String, String> partitionValues;
+
+    private List<Integer> posOfPartCols;
+    private List<Integer> posOfDynPartCols;
+
+    private Properties properties;
+
+    private int maxDynamicPartitions;
+
+    /** List of keys for which values were not specified at write setup time, to be infered at write time */
+    private List<String> dynamicPartitioningKeys;
+
+    private boolean harRequested;
+
+    /**
+     * Initializes a new OutputJobInfo instance
+     * for writing data from a table.
+     * @param databaseName the db name
+     * @param tableName the table name
+     * @param partitionValues The partition values to publish to, can be null or empty Map to
+     * work with hadoop security, the kerberos principal name of the server - else null
+     * The principal name should be of the form:
+     * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
+     * The special string _HOST will be replaced automatically with the correct host name
+     * indicate write to a unpartitioned table. For partitioned tables, this map should
+     * contain keys for all partition columns with corresponding values.
+     */
+    public static OutputJobInfo create(String databaseName,
+                                       String tableName,
+                                       Map<String, String> partitionValues) {
+        return new OutputJobInfo(databaseName,
+            tableName,
+            partitionValues);
+    }
+
+    private OutputJobInfo(String databaseName,
+                          String tableName,
+                          Map<String, String> partitionValues) {
+        this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
+        this.tableName = tableName;
+        this.partitionValues = partitionValues;
+        this.properties = new Properties();
+    }
+
+    /**
+     * @return the posOfPartCols
+     */
+    protected List<Integer> getPosOfPartCols() {
+        return posOfPartCols;
+    }
+
+    /**
+     * @return the posOfDynPartCols
+     */
+    protected List<Integer> getPosOfDynPartCols() {
+        return posOfDynPartCols;
+    }
+
+    /**
+     * @param posOfPartCols the posOfPartCols to set
+     */
+    protected void setPosOfPartCols(List<Integer> posOfPartCols) {
+        // sorting the list in the descending order so that deletes happen back-to-front
+        Collections.sort(posOfPartCols, new Comparator<Integer>() {
+            @Override
+            public int compare(Integer earlier, Integer later) {
+                return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1);
+            }
+        });
+        this.posOfPartCols = posOfPartCols;
+    }
+
+    /**
+     * @param posOfDynPartCols the posOfDynPartCols to set
+     */
+    protected void setPosOfDynPartCols(List<Integer> posOfDynPartCols) {
+        // Important - no sorting here! We retain order, it's used to match with values at runtime
+        this.posOfDynPartCols = posOfDynPartCols;
+    }
+
+    /**
+     * @return the tableInfo
+     */
+    public HCatTableInfo getTableInfo() {
+        return tableInfo;
+    }
+
+    /**
+     * @return the outputSchema
+     */
+    public HCatSchema getOutputSchema() {
+        return outputSchema;
+    }
+
+    /**
+     * @param schema the outputSchema to set
+     */
+    public void setOutputSchema(HCatSchema schema) {
+        this.outputSchema = schema;
+    }
+
+    /**
+     * @return the location
+     */
+    public String getLocation() {
+        return location;
+    }
+
+    /**
+     * @param location location to write to
+     */
+    public void setLocation(String location) {
+        this.location = location;
+    }
+
+    /**
+     * Sets the value of partitionValues
+     * @param partitionValues the partition values to set
+     */
+    void setPartitionValues(Map<String, String> partitionValues) {
+        this.partitionValues = partitionValues;
+    }
+
+    /**
+     * Gets the value of partitionValues
+     * @return the partitionValues
+     */
+    public Map<String, String> getPartitionValues() {
+        return partitionValues;
+    }
+
+    /**
+     * set the tablInfo instance
+     * this should be the same instance
+     * determined by this object's DatabaseName and TableName
+     * @param tableInfo
+     */
+    void setTableInfo(HCatTableInfo tableInfo) {
+        this.tableInfo = tableInfo;
+    }
+
+    /**
+     * @return database name of table to write to
+     */
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    /**
+     * @return name of table to write to
+     */
+    public String getTableName() {
+        return tableName;
+    }
+
+    /**
+     * Set/Get Property information to be passed down to *StorageHandler implementation
+     * put implementation specific storage handler configurations here
+     * @return the implementation specific job properties
+     */
+    public Properties getProperties() {
+        return properties;
+    }
+
+    /**
+     * Set maximum number of allowable dynamic partitions
+     * @param maxDynamicPartitions
+     */
+    public void setMaximumDynamicPartitions(int maxDynamicPartitions) {
+        this.maxDynamicPartitions = maxDynamicPartitions;
+    }
+
+    /**
+     * Returns maximum number of allowable dynamic partitions
+     * @return maximum number of allowable dynamic partitions
+     */
+    public int getMaxDynamicPartitions() {
+        return this.maxDynamicPartitions;
+    }
+
+    /**
+     * Sets whether or not hadoop archiving has been requested for this job
+     * @param harRequested
+     */
+    public void setHarRequested(boolean harRequested) {
+        this.harRequested = harRequested;
+    }
+
+    /**
+     * Returns whether or not hadoop archiving has been requested for this job
+     * @return whether or not hadoop archiving has been requested for this job
+     */
+    public boolean getHarRequested() {
+        return this.harRequested;
+    }
+
+    /**
+     * Returns whether or not Dynamic Partitioning is used
+     * @return whether or not dynamic partitioning is currently enabled and used
+     */
+    public boolean isDynamicPartitioningUsed() {
+        return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty()));
+    }
+
+    /**
+     * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys
+     * @param dynamicPartitioningKeys
+     */
+    public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) {
+        this.dynamicPartitioningKeys = dynamicPartitioningKeys;
+    }
+
+    public List<String> getDynamicPartitioningKeys() {
+        return this.dynamicPartitioningKeys;
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/PartInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/PartInfo.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/PartInfo.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/PartInfo.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The Class used to serialize the partition information read from the metadata server that maps to a partition. */
+public class PartInfo implements Serializable {
+
+    /** The serialization version */
+    private static final long serialVersionUID = 1L;
+
+    /** The partition schema. */
+    private final HCatSchema partitionSchema;
+
+    /** The information about which input storage handler to use */
+    private final String storageHandlerClassName;
+    private final String inputFormatClassName;
+    private final String outputFormatClassName;
+    private final String serdeClassName;
+
+    /** HCat-specific properties set at the partition */
+    private final Properties hcatProperties;
+
+    /** The data location. */
+    private final String location;
+
+    /** The map of partition key names and their values. */
+    private Map<String, String> partitionValues;
+
+    /** Job properties associated with this parition */
+    Map<String, String> jobProperties;
+
+    /** the table info associated with this partition */
+    HCatTableInfo tableInfo;
+
+    /**
+     * Instantiates a new hcat partition info.
+     * @param partitionSchema the partition schema
+     * @param storageHandler the storage handler
+     * @param location the location
+     * @param hcatProperties hcat-specific properties at the partition
+     * @param jobProperties the job properties
+     * @param tableInfo the table information
+     */
+    public PartInfo(HCatSchema partitionSchema, HCatStorageHandler storageHandler,
+                    String location, Properties hcatProperties,
+                    Map<String, String> jobProperties, HCatTableInfo tableInfo) {
+        this.partitionSchema = partitionSchema;
+        this.location = location;
+        this.hcatProperties = hcatProperties;
+        this.jobProperties = jobProperties;
+        this.tableInfo = tableInfo;
+
+        this.storageHandlerClassName = storageHandler.getClass().getName();
+        this.inputFormatClassName = storageHandler.getInputFormatClass().getName();
+        this.serdeClassName = storageHandler.getSerDeClass().getName();
+        this.outputFormatClassName = storageHandler.getOutputFormatClass().getName();
+    }
+
+    /**
+     * Gets the value of partitionSchema.
+     * @return the partitionSchema
+     */
+    public HCatSchema getPartitionSchema() {
+        return partitionSchema;
+    }
+
+    /**
+     * @return the storage handler class name
+     */
+    public String getStorageHandlerClassName() {
+        return storageHandlerClassName;
+    }
+
+    /**
+     * @return the inputFormatClassName
+     */
+    public String getInputFormatClassName() {
+        return inputFormatClassName;
+    }
+
+    /**
+     * @return the outputFormatClassName
+     */
+    public String getOutputFormatClassName() {
+        return outputFormatClassName;
+    }
+
+    /**
+     * @return the serdeClassName
+     */
+    public String getSerdeClassName() {
+        return serdeClassName;
+    }
+
+    /**
+     * Gets the input storage handler properties.
+     * @return HCat-specific properties set at the partition
+     */
+    public Properties getInputStorageHandlerProperties() {
+        return hcatProperties;
+    }
+
+    /**
+     * Gets the value of location.
+     * @return the location
+     */
+    public String getLocation() {
+        return location;
+    }
+
+    /**
+     * Sets the partition values.
+     * @param partitionValues the new partition values
+     */
+    public void setPartitionValues(Map<String, String> partitionValues) {
+        this.partitionValues = partitionValues;
+    }
+
+    /**
+     * Gets the partition values.
+     * @return the partition values
+     */
+    public Map<String, String> getPartitionValues() {
+        return partitionValues;
+    }
+
+    /**
+     * Gets the job properties.
+     * @return a map of the job properties
+     */
+    public Map<String, String> getJobProperties() {
+        return jobProperties;
+    }
+
+    /**
+     * Gets the HCatalog table information.
+     * @return the table information
+     */
+    public HCatTableInfo getTableInfo() {
+        return tableInfo;
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/ProgressReporter.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/ProgressReporter.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/ProgressReporter.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/ProgressReporter.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+class ProgressReporter extends StatusReporter implements Reporter {
+
+    private TaskInputOutputContext context = null;
+    private TaskAttemptContext taskAttemptContext = null;
+
+    public ProgressReporter(TaskAttemptContext context) {
+        if (context instanceof TaskInputOutputContext) {
+            this.context = (TaskInputOutputContext) context;
+        } else {
+            taskAttemptContext = context;
+        }
+    }
+
+    @Override
+    public void setStatus(String status) {
+        if (context != null) {
+            context.setStatus(status);
+        }
+    }
+
+    @Override
+    public Counters.Counter getCounter(Enum<?> name) {
+        return (context != null) ? (Counters.Counter) context.getCounter(name) : null;
+    }
+
+    @Override
+    public Counters.Counter getCounter(String group, String name) {
+        return (context != null) ? (Counters.Counter) context.getCounter(group, name) : null;
+    }
+
+    @Override
+    public void incrCounter(Enum<?> key, long amount) {
+        if (context != null) {
+            context.getCounter(key).increment(amount);
+        }
+    }
+
+    @Override
+    public void incrCounter(String group, String counter, long amount) {
+        if (context != null) {
+            context.getCounter(group, counter).increment(amount);
+        }
+    }
+
+    @Override
+    public InputSplit getInputSplit() throws UnsupportedOperationException {
+        return null;
+    }
+
+    public float getProgress() {
+        /* Required to build against 0.23 Reporter and StatusReporter. */
+        /* TODO: determine the progress. */
+        return 0.0f;
+    }
+
+    @Override
+    public void progress() {
+        if (context != null) {
+            context.progress();
+        } else {
+            taskAttemptContext.progress();
+        }
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+/**
+ *  This class will contain an implementation of an RecordWriter.
+ *  See {@link OutputFormatContainer} for more information about containers.
+ */
+abstract class RecordWriterContainer extends  RecordWriter<WritableComparable<?>, HCatRecord> {
+
+    private final org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter;
+
+    /**
+     * @param context current JobContext
+     * @param baseRecordWriter RecordWriter that this instance will contain
+     */
+    public RecordWriterContainer(TaskAttemptContext context,
+                                 org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter) {
+        this.baseRecordWriter = baseRecordWriter;
+    }
+
+    /**
+     * @return underlying RecordWriter
+     */
+    public org.apache.hadoop.mapred.RecordWriter getBaseRecordWriter() {
+        return baseRecordWriter;
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/Security.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/Security.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/Security.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/Security.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class Security {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HCatOutputFormat.class);
+
+    // making sure this is not initialized unless needed
+    private static final class LazyHolder {
+        public static final Security INSTANCE = new Security();
+    }
+
+    public static Security getInstance() {
+        return LazyHolder.INSTANCE;
+    }
+
+    boolean isSecurityEnabled() {
+        try {
+            Method m = UserGroupInformation.class.getMethod("isSecurityEnabled");
+            return (Boolean) m.invoke(null, (Object[]) null);
+        } catch (NoSuchMethodException e) {
+            LOG.info("Security is not supported by this version of hadoop.", e);
+        } catch (InvocationTargetException e) {
+            String msg = "Failed to call isSecurityEnabled()";
+            LOG.info(msg, e);
+            throw new IllegalStateException(msg, e);
+        } catch (IllegalAccessException e) {
+            String msg = "Failed to call isSecurityEnabled()";
+            LOG.info(msg, e);
+            throw new IllegalStateException(msg, e);
+        }
+        return false;
+    }
+
+    // a signature string to associate with a HCatTableInfo - essentially
+    // a concatenation of dbname, tablename and partition keyvalues.
+    String getTokenSignature(OutputJobInfo outputJobInfo) {
+        StringBuilder result = new StringBuilder("");
+        String dbName = outputJobInfo.getDatabaseName();
+        if (dbName != null) {
+            result.append(dbName);
+        }
+        String tableName = outputJobInfo.getTableName();
+        if (tableName != null) {
+            result.append("." + tableName);
+        }
+        Map<String, String> partValues = outputJobInfo.getPartitionValues();
+        if (partValues != null) {
+            for (Entry<String, String> entry : partValues.entrySet()) {
+                result.append("/");
+                result.append(entry.getKey());
+                result.append("=");
+                result.append(entry.getValue());
+            }
+
+        }
+        return result.toString();
+    }
+
+    void handleSecurity(
+        Credentials credentials,
+        OutputJobInfo outputJobInfo,
+        HiveMetaStoreClient client,
+        Configuration conf,
+        boolean harRequested)
+        throws IOException, MetaException, TException, Exception {
+        if (UserGroupInformation.isSecurityEnabled()) {
+            UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+            // check if oozie has set up a hcat deleg. token - if so use it
+            TokenSelector<? extends TokenIdentifier> hiveTokenSelector = new DelegationTokenSelector();
+            //Oozie does not change the service field of the token
+            //hence by default token generation will have a value of "new Text("")"
+            //HiveClient will look for a use TokenSelector.selectToken() with service
+            //set to empty "Text" if hive.metastore.token.signature property is set to null
+            Token<? extends TokenIdentifier> hiveToken = hiveTokenSelector.selectToken(
+                new Text(), ugi.getTokens());
+            if (hiveToken == null) {
+                // we did not get token set up by oozie, let's get them ourselves here.
+                // we essentially get a token per unique Output HCatTableInfo - this is
+                // done because through Pig, setOutput() method is called multiple times
+                // We want to only get the token once per unique output HCatTableInfo -
+                // we cannot just get one token since in multi-query case (> 1 store in 1 job)
+                // or the case when a single pig script results in > 1 jobs, the single
+                // token will get cancelled by the output committer and the subsequent
+                // stores will fail - by tying the token with the concatenation of
+                // dbname, tablename and partition keyvalues of the output
+                // TableInfo, we can have as many tokens as there are stores and the TokenSelector
+                // will correctly pick the right tokens which the committer will use and
+                // cancel.
+                String tokenSignature = getTokenSignature(outputJobInfo);
+                // get delegation tokens from hcat server and store them into the "job"
+                // These will be used in to publish partitions to
+                // hcat normally in OutputCommitter.commitJob()
+                // when the JobTracker in Hadoop MapReduce starts supporting renewal of
+                // arbitrary tokens, the renewer should be the principal of the JobTracker
+                hiveToken = HCatUtil.extractThriftToken(client.getDelegationToken(ugi.getUserName()), tokenSignature);
+
+                if (harRequested) {
+                    TokenSelector<? extends TokenIdentifier> jtTokenSelector =
+                        new org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSelector();
+                    Token jtToken = jtTokenSelector.selectToken(org.apache.hadoop.security.SecurityUtil.buildTokenService(
+                        HCatHadoopShims.Instance.get().getResourceManagerAddress(conf)), ugi.getTokens());
+                    if (jtToken == null) {
+                        //we don't need to cancel this token as the TokenRenewer for JT tokens
+                        //takes care of cancelling them
+                        credentials.addToken(
+                            new Text("hcat jt token"),
+                            HCatUtil.getJobTrackerDelegationToken(conf, ugi.getUserName())
+                        );
+                    }
+                }
+
+                credentials.addToken(new Text(ugi.getUserName() + "_" + tokenSignature), hiveToken);
+                // this will be used by the outputcommitter to pass on to the metastore client
+                // which in turn will pass on to the TokenSelector so that it can select
+                // the right token.
+                conf.set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
+            }
+        }
+    }
+
+    void handleSecurity(
+        Job job,
+        OutputJobInfo outputJobInfo,
+        HiveMetaStoreClient client,
+        Configuration conf,
+        boolean harRequested)
+        throws IOException, MetaException, TException, Exception {
+        handleSecurity(job.getCredentials(), outputJobInfo, client, conf, harRequested);
+    }
+
+    // we should cancel hcat token if it was acquired by hcat
+    // and not if it was supplied (ie Oozie). In the latter
+    // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
+    void cancelToken(HiveMetaStoreClient client, JobContext context) throws IOException, MetaException {
+        String tokenStrForm = client.getTokenStrForm();
+        if (tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+            try {
+                client.cancelDelegationToken(tokenStrForm);
+            } catch (TException e) {
+                String msg = "Failed to cancel delegation token";
+                LOG.error(msg, e);
+                throw new IOException(msg, e);
+            }
+        }
+    }
+
+}