You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC

svn commit: r1520466 [4/18] - in /hive/trunk/hcatalog: core/src/main/java/org/apache/hcatalog/cli/ core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/ core/src/main/java/org/apache/hcatalog/common/ core/src/main/java/org/apache/hcatalog/data/ ...

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/EntityBase.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/EntityBase.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/EntityBase.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/EntityBase.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+/**
+ * This is a base class for 
+ * {@link ReadEntity.Builder} / {@link WriteEntity.Builder}.
+ * Many fields in them are common, so this class
+ * contains the common fields.
+ */
+
+abstract class EntityBase {
+
+    String region;
+    String tableName;
+    String dbName;
+    Map<String, String> partitionKVs;
+
+    /**
+     * Common methods for {@link ReadEntity} and {@link WriteEntity}
+     */
+
+    abstract static class Entity extends EntityBase {
+
+        public String getRegion() {
+            return region;
+        }
+
+        public String getTableName() {
+            return tableName;
+        }
+
+        public String getDbName() {
+            return dbName;
+        }
+
+        public Map<String, String> getPartitionKVs() {
+            return partitionKVs;
+        }
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatReader.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatReader.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatReader.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+
+/**
+ * This abstract class is internal to HCatalog and abstracts away the notion of
+ * underlying system from which reads will be done.
+ */
+
+public abstract class HCatReader {
+
+    /**
+     * This should be called at master node to obtain {@link ReaderContext} which
+     * then should be serialized and sent to slave nodes.
+     *
+     * @return {@link ReaderContext}
+     * @throws HCatException
+     */
+    public abstract ReaderContext prepareRead() throws HCatException;
+
+    /**
+     * This should be called at slave nodes to read {@link HCatRecord}s
+     *
+     * @return {@link Iterator} of {@link HCatRecord}
+     * @throws HCatException
+     */
+    public abstract Iterator<HCatRecord> read() throws HCatException;
+
+    /**
+     * This constructor will be invoked by {@link DataTransferFactory} at master
+     * node. Don't use this constructor. Instead, use {@link DataTransferFactory}
+     *
+     * @param re
+     * @param config
+     */
+    protected HCatReader(final ReadEntity re, final Map<String, String> config) {
+        this(config);
+        this.re = re;
+    }
+
+    /**
+     * This constructor will be invoked by {@link DataTransferFactory} at slave
+     * nodes. Don't use this constructor. Instead, use {@link DataTransferFactory}
+     *
+     * @param config
+     * @param sp
+     */
+
+    protected HCatReader(final Configuration config, StateProvider sp) {
+        this.conf = config;
+        this.sp = sp;
+    }
+
+    protected ReadEntity re; // This will be null at slaves.
+    protected Configuration conf;
+    protected ReaderContext info;
+    protected StateProvider sp; // This will be null at master.
+
+    private HCatReader(final Map<String, String> config) {
+        Configuration conf = new Configuration();
+        if (null != config) {
+            for (Entry<String, String> kv : config.entrySet()) {
+                conf.set(kv.getKey(), kv.getValue());
+            }
+        }
+        this.conf = conf;
+    }
+
+    public Configuration getConf() {
+        if (null == conf) {
+            throw new IllegalStateException(
+                "HCatReader is not constructed correctly.");
+        }
+        return conf;
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatWriter.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatWriter.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/HCatWriter.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+
+/**
+ * This abstraction is internal to HCatalog. This is to facilitate writing to
+ * HCatalog from external systems. Don't try to instantiate this directly.
+ * Instead, use {@link DataTransferFactory}
+ */
+
+public abstract class HCatWriter {
+
+    protected Configuration conf;
+    protected WriteEntity we; // This will be null at slave nodes.
+    protected WriterContext info;
+    protected StateProvider sp;
+
+    /**
+     * External system should invoke this method exactly once from a master node.
+     *
+     * @return {@link WriterContext} This should be serialized and sent to slave
+     *         nodes to construct HCatWriter there.
+     * @throws HCatException
+     */
+    public abstract WriterContext prepareWrite() throws HCatException;
+
+    /**
+     * This method should be used at slave needs to perform writes.
+     *
+     * @param recordItr
+     *          {@link Iterator} records to be written into HCatalog.
+     * @throws {@link HCatException}
+     */
+    public abstract void write(final Iterator<HCatRecord> recordItr)
+        throws HCatException;
+
+    /**
+     * This method should be called at master node. Primary purpose of this is to
+     * do metadata commit.
+     *
+     * @throws {@link HCatException}
+     */
+    public abstract void commit(final WriterContext context) throws HCatException;
+
+    /**
+     * This method should be called at master node. Primary purpose of this is to
+     * do cleanups in case of failures.
+     *
+     * @throws {@link HCatException} *
+     */
+    public abstract void abort(final WriterContext context) throws HCatException;
+
+    /**
+     * This constructor will be used at master node
+     *
+     * @param we
+     *          WriteEntity defines where in storage records should be written to.
+     * @param config
+     *          Any configuration which external system wants to communicate to
+     *          HCatalog for performing writes.
+     */
+    protected HCatWriter(final WriteEntity we, final Map<String, String> config) {
+        this(config);
+        this.we = we;
+    }
+
+    /**
+     * This constructor will be used at slave nodes.
+     *
+     * @param config
+     */
+    protected HCatWriter(final Configuration config, final StateProvider sp) {
+        this.conf = config;
+        this.sp = sp;
+    }
+
+    private HCatWriter(final Map<String, String> config) {
+        Configuration conf = new Configuration();
+        if (config != null) {
+            // user is providing config, so it could be null.
+            for (Entry<String, String> kv : config.entrySet()) {
+                conf.set(kv.getKey(), kv.getValue());
+            }
+        }
+
+        this.conf = conf;
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReadEntity.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReadEntity.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReadEntity.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReadEntity.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+public class ReadEntity extends EntityBase.Entity {
+
+    private String filterString;
+
+    /**
+     * Don't instantiate {@link ReadEntity} directly. Use,
+     * {@link ReadEntity.Builder} instead.
+     *
+     */
+    private ReadEntity() {
+        // Not allowed
+    }
+
+    private ReadEntity(Builder builder) {
+
+        this.region = builder.region;
+        this.dbName = builder.dbName;
+        this.tableName = builder.tableName;
+        this.partitionKVs = builder.partitionKVs;
+        this.filterString = builder.filterString;
+    }
+
+    public String getFilterString() {
+        return this.filterString;
+    }
+
+    /**
+     * This class should be used to build {@link ReadEntity}. It follows builder
+     * pattern, letting you build your {@link ReadEntity} with whatever level of
+     * detail you want.
+     *
+     */
+    public static class Builder extends EntityBase {
+
+        private String filterString;
+
+        public Builder withRegion(final String region) {
+            this.region = region;
+            return this;
+        }
+
+        public Builder withDatabase(final String dbName) {
+            this.dbName = dbName;
+            return this;
+        }
+
+        public Builder withTable(final String tblName) {
+            this.tableName = tblName;
+            return this;
+        }
+
+        public Builder withPartition(final Map<String, String> partKVs) {
+            this.partitionKVs = partKVs;
+            return this;
+        }
+
+        public Builder withFilter(String filterString) {
+            this.filterString = filterString;
+            return this;
+        }
+
+        public ReadEntity build() {
+            return new ReadEntity(this);
+        }
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReaderContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReaderContext.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReaderContext.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/ReaderContext.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+
+/**
+ * This class will contain information of different {@link InputSplit} obtained
+ * at master node and configuration. This class implements
+ * {@link Externalizable} so it can be serialized using standard java
+ * mechanisms.
+ */
+public class ReaderContext implements Externalizable, Configurable {
+
+    private static final long serialVersionUID = -2656468331739574367L;
+    private List<InputSplit> splits;
+    private Configuration conf;
+
+    public ReaderContext() {
+        this.splits = new ArrayList<InputSplit>();
+        this.conf = new Configuration();
+    }
+
+    public void setInputSplits(final List<InputSplit> splits) {
+        this.splits = splits;
+    }
+
+    public List<InputSplit> getSplits() {
+        return splits;
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(final Configuration config) {
+        conf = config;
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        conf.write(out);
+        out.writeInt(splits.size());
+        for (InputSplit split : splits) {
+            ((HCatSplit) split).write(out);
+        }
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException,
+        ClassNotFoundException {
+        conf.readFields(in);
+        int numOfSplits = in.readInt();
+        for (int i = 0; i < numOfSplits; i++) {
+            HCatSplit split = new HCatSplit();
+            split.readFields(in);
+            splits.add(split);
+        }
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriteEntity.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriteEntity.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriteEntity.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriteEntity.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.util.Map;
+
+public class WriteEntity extends EntityBase.Entity {
+
+    /**
+     * Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to
+     * build {@link WriteEntity}.
+     */
+
+    private WriteEntity() {
+        // Not allowed.
+    }
+
+    private WriteEntity(Builder builder) {
+        this.region = builder.region;
+        this.dbName = builder.dbName;
+        this.tableName = builder.tableName;
+        this.partitionKVs = builder.partitionKVs;
+    }
+
+    /**
+     * This class should be used to build {@link WriteEntity}. It follows builder
+     * pattern, letting you build your {@link WriteEntity} with whatever level of
+     * detail you want.
+     *
+     */
+    public static class Builder extends EntityBase {
+
+        public Builder withRegion(final String region) {
+            this.region = region;
+            return this;
+        }
+
+        public Builder withDatabase(final String dbName) {
+            this.dbName = dbName;
+            return this;
+        }
+
+        public Builder withTable(final String tblName) {
+            this.tableName = tblName;
+            return this;
+        }
+
+        public Builder withPartition(final Map<String, String> partKVs) {
+            this.partitionKVs = partKVs;
+            return this;
+        }
+
+        public WriteEntity build() {
+            return new WriteEntity(this);
+        }
+
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriterContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriterContext.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriterContext.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/WriterContext.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This contains information obtained at master node to help prepare slave nodes
+ * for writer. This class implements {@link Externalizable} so it can be
+ * serialized using standard java mechanisms. Master should serialize it and
+ * make it available to slaves to prepare for writes.
+ */
+public class WriterContext implements Externalizable, Configurable {
+
+    private static final long serialVersionUID = -5899374262971611840L;
+    private Configuration conf;
+
+    public WriterContext() {
+        conf = new Configuration();
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(final Configuration config) {
+        this.conf = config;
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        conf.write(out);
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException,
+        ClassNotFoundException {
+        conf.readFields(in);
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.HCatReader;
+import org.apache.hcatalog.data.transfer.ReadEntity;
+import org.apache.hcatalog.data.transfer.ReaderContext;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+
+/**
+ * This reader reads via {@link HCatInputFormat}
+ *
+ */
+public class HCatInputFormatReader extends HCatReader {
+
+    private InputSplit split;
+
+    public HCatInputFormatReader(InputSplit split, Configuration config,
+                                 StateProvider sp) {
+        super(config, sp);
+        this.split = split;
+    }
+
+    public HCatInputFormatReader(ReadEntity info, Map<String, String> config) {
+        super(info, config);
+    }
+
+    @Override
+    public ReaderContext prepareRead() throws HCatException {
+        try {
+            Job job = new Job(conf);
+            HCatInputFormat hcif = HCatInputFormat.setInput(
+                job, re.getDbName(), re.getTableName()).setFilter(re.getFilterString());
+            ReaderContext cntxt = new ReaderContext();
+            cntxt.setInputSplits(hcif.getSplits(
+                HCatHadoopShims.Instance.get().createJobContext(job.getConfiguration(), null)));
+            cntxt.setConf(job.getConfiguration());
+            return cntxt;
+        } catch (IOException e) {
+            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+        } catch (InterruptedException e) {
+            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+        }
+    }
+
+    @Override
+    public Iterator<HCatRecord> read() throws HCatException {
+
+        HCatInputFormat inpFmt = new HCatInputFormat();
+        RecordReader<WritableComparable, HCatRecord> rr;
+        try {
+            TaskAttemptContext cntxt = HCatHadoopShims.Instance.get().createTaskAttemptContext(conf, new TaskAttemptID());
+            rr = inpFmt.createRecordReader(split, cntxt);
+            rr.initialize(split, cntxt);
+        } catch (IOException e) {
+            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+        } catch (InterruptedException e) {
+            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+        }
+        return new HCatRecordItr(rr);
+    }
+
+    private static class HCatRecordItr implements Iterator<HCatRecord> {
+
+        private RecordReader<WritableComparable, HCatRecord> curRecReader;
+
+        HCatRecordItr(RecordReader<WritableComparable, HCatRecord> rr) {
+            curRecReader = rr;
+        }
+
+        @Override
+        public boolean hasNext() {
+            try {
+                boolean retVal = curRecReader.nextKeyValue();
+                if (retVal) {
+                    return true;
+                }
+                // if its false, we need to close recordReader.
+                curRecReader.close();
+                return false;
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public HCatRecord next() {
+            try {
+                return curRecReader.getCurrentValue();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Not allowed");
+        }
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.transfer.HCatWriter;
+import org.apache.hcatalog.data.transfer.WriteEntity;
+import org.apache.hcatalog.data.transfer.WriterContext;
+import org.apache.hcatalog.data.transfer.state.StateProvider;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+
+/**
+ * This writer writes via {@link HCatOutputFormat}
+ *
+ */
+public class HCatOutputFormatWriter extends HCatWriter {
+
+    public HCatOutputFormatWriter(WriteEntity we, Map<String, String> config) {
+        super(we, config);
+    }
+
+    public HCatOutputFormatWriter(Configuration config, StateProvider sp) {
+        super(config, sp);
+    }
+
+    @Override
+    public WriterContext prepareWrite() throws HCatException {
+        OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(),
+            we.getTableName(), we.getPartitionKVs());
+        Job job;
+        try {
+            job = new Job(conf);
+            HCatOutputFormat.setOutput(job, jobInfo);
+            HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job));
+            HCatOutputFormat outFormat = new HCatOutputFormat();
+            outFormat.checkOutputSpecs(job);
+            outFormat.getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext
+                (job.getConfiguration(), HCatHadoopShims.Instance.get().createTaskAttemptID())).setupJob(job);
+        } catch (IOException e) {
+            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+        } catch (InterruptedException e) {
+            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+        }
+        WriterContext cntxt = new WriterContext();
+        cntxt.setConf(job.getConfiguration());
+        return cntxt;
+    }
+
+    @Override
+    public void write(Iterator<HCatRecord> recordItr) throws HCatException {
+
+        int id = sp.getId();
+        setVarsInConf(id);
+        HCatOutputFormat outFormat = new HCatOutputFormat();
+        TaskAttemptContext cntxt = HCatHadoopShims.Instance.get().createTaskAttemptContext
+            (conf, new TaskAttemptID(HCatHadoopShims.Instance.get().createTaskID(), id));
+        OutputCommitter committer = null;
+        RecordWriter<WritableComparable<?>, HCatRecord> writer;
+        try {
+            committer = outFormat.getOutputCommitter(cntxt);
+            committer.setupTask(cntxt);
+            writer = outFormat.getRecordWriter(cntxt);
+            while (recordItr.hasNext()) {
+                HCatRecord rec = recordItr.next();
+                writer.write(null, rec);
+            }
+            writer.close(cntxt);
+            if (committer.needsTaskCommit(cntxt)) {
+                committer.commitTask(cntxt);
+            }
+        } catch (IOException e) {
+            if (null != committer) {
+                try {
+                    committer.abortTask(cntxt);
+                } catch (IOException e1) {
+                    throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+                }
+            }
+            throw new HCatException("Failed while writing", e);
+        } catch (InterruptedException e) {
+            if (null != committer) {
+                try {
+                    committer.abortTask(cntxt);
+                } catch (IOException e1) {
+                    throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1);
+                }
+            }
+            throw new HCatException("Failed while writing", e);
+        }
+    }
+
+    @Override
+    public void commit(WriterContext context) throws HCatException {
+        try {
+            new HCatOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext
+                (context.getConf(), HCatHadoopShims.Instance.get().createTaskAttemptID()))
+                .commitJob(HCatHadoopShims.Instance.get().createJobContext(context.getConf(), null));
+        } catch (IOException e) {
+            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+        } catch (InterruptedException e) {
+            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+        }
+    }
+
+    @Override
+    public void abort(WriterContext context) throws HCatException {
+        try {
+            new HCatOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext
+                (context.getConf(), HCatHadoopShims.Instance.get().createTaskAttemptID()))
+                .abortJob(HCatHadoopShims.Instance.get().createJobContext(context.getConf(), null), State.FAILED);
+        } catch (IOException e) {
+            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+        } catch (InterruptedException e) {
+            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
+        }
+    }
+
+    private void setVarsInConf(int id) {
+
+        // Following two config keys are required by FileOutputFormat to work
+        // correctly.
+        // In usual case of Hadoop, JobTracker will set these before launching
+        // tasks.
+        // Since there is no jobtracker here, we set it ourself.
+        conf.setInt("mapred.task.partition", id);
+        conf.set("mapred.task.id", "attempt__0000_r_000000_" + id);
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.state;
+
+import java.text.NumberFormat;
+import java.util.Random;
+
+public class DefaultStateProvider implements StateProvider {
+
+    /**
+     * Default implementation. Here, ids are generated randomly.
+     */
+    @Override
+    public int getId() {
+
+        NumberFormat numberFormat = NumberFormat.getInstance();
+        numberFormat.setMinimumIntegerDigits(5);
+        numberFormat.setGroupingUsed(false);
+        return Integer
+            .parseInt(numberFormat.format(Math.abs(new Random().nextInt())));
+    }
+
+    private static StateProvider sp;
+
+    public static synchronized StateProvider get() {
+        if (null == sp) {
+            sp = new DefaultStateProvider();
+        }
+        return sp;
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/StateProvider.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/StateProvider.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/StateProvider.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/data/transfer/state/StateProvider.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.data.transfer.state;
+
+/**
+ * If external system wants to communicate any state to slaves, they can do so
+ * via this interface. One example of this in case of Map-Reduce is ids assigned
+ * by JobTracker to TaskTracker.
+ */
+public interface StateProvider {
+
+    /**
+     * This method should return id assigned to slave node.
+     *
+     * @return id
+     */
+    public int getId();
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.har;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.tools.HadoopArchives;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+
+public class HarOutputCommitterPostProcessor {
+
+    boolean isEnabled = false;
+
+    public boolean isEnabled() {
+        return isEnabled;
+    }
+
+    public void setEnabled(boolean enabled) {
+        this.isEnabled = enabled;
+    }
+
+
+    public void exec(JobContext context, Partition partition, Path partPath) throws IOException {
+//    LOG.info("Archiving partition ["+partPath.toString()+"]");
+        makeHar(context, partPath.toUri().toString(), harFile(partPath));
+        partition.getParameters().put(hive_metastoreConstants.IS_ARCHIVED, "true");
+    }
+
+    public String harFile(Path ptnPath) throws IOException {
+        String harFile = ptnPath.toString().replaceFirst("/+$", "") + ".har";
+//    LOG.info("har file : " + harFile);
+        return harFile;
+    }
+
+    public String getParentFSPath(Path ptnPath) throws IOException {
+        return ptnPath.toUri().getPath().replaceFirst("/+$", "");
+    }
+
+    public String getProcessedLocation(Path ptnPath) throws IOException {
+        String harLocn = ("har://" + ptnPath.toUri().getPath()).replaceFirst("/+$", "") + ".har" + Path.SEPARATOR;
+//    LOG.info("har location : " + harLocn);
+        return harLocn;
+    }
+
+
+    /**
+     * Creates a har file from the contents of a given directory, using that as root.
+     * @param dir Directory to archive
+     * @param harFile The HAR file to create
+     */
+    public static void makeHar(JobContext context, String dir, String harFile) throws IOException {
+//    Configuration conf = context.getConfiguration();
+//    Credentials creds = context.getCredentials();
+
+//    HCatUtil.logAllTokens(LOG,context);
+
+        int lastSep = harFile.lastIndexOf(Path.SEPARATOR_CHAR);
+        Path archivePath = new Path(harFile.substring(0, lastSep));
+        final String[] args = {
+            "-archiveName",
+            harFile.substring(lastSep + 1, harFile.length()),
+            "-p",
+            dir,
+            "*",
+            archivePath.toString()
+        };
+//    for (String arg : args){
+//      LOG.info("Args to har : "+ arg);
+//    }
+        try {
+            Configuration newConf = new Configuration();
+            FileSystem fs = archivePath.getFileSystem(newConf);
+
+            String hadoopTokenFileLocationEnvSetting = System.getenv(HCatConstants.SYSENV_HADOOP_TOKEN_FILE_LOCATION);
+            if ((hadoopTokenFileLocationEnvSetting != null) && (!hadoopTokenFileLocationEnvSetting.isEmpty())) {
+                newConf.set(HCatConstants.CONF_MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocationEnvSetting);
+//      LOG.info("System.getenv(\"HADOOP_TOKEN_FILE_LOCATION\") =["+  System.getenv("HADOOP_TOKEN_FILE_LOCATION")+"]");
+            }
+//      for (FileStatus ds : fs.globStatus(new Path(dir, "*"))){
+//        LOG.info("src : "+ds.getPath().toUri().toString());
+//      }
+
+            final HadoopArchives har = new HadoopArchives(newConf);
+            int rc = ToolRunner.run(har, args);
+            if (rc != 0) {
+                throw new Exception("Har returned error code " + rc);
+            }
+
+//      for (FileStatus hs : fs.globStatus(new Path(harFile, "*"))){
+//        LOG.info("dest : "+hs.getPath().toUri().toString());
+//      }
+//      doHarCheck(fs,harFile);
+//      LOG.info("Nuking " + dir);
+            fs.delete(new Path(dir), true);
+        } catch (Exception e) {
+            throw new HCatException("Error creating Har [" + harFile + "] from [" + dir + "]", e);
+        }
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.mapred.HCatMapRedUtil;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Part of the DefaultOutput*Container classes
+ * See {@link DefaultOutputFormatContainer} for more information
+ */
+class DefaultOutputCommitterContainer extends OutputCommitterContainer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultOutputCommitterContainer.class);
+
+    /**
+     * @param context current JobContext
+     * @param baseCommitter OutputCommitter to contain
+     * @throws IOException
+     */
+    public DefaultOutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter baseCommitter) throws IOException {
+        super(context, baseCommitter);
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext context) throws IOException {
+        getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context));
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext context) throws IOException {
+        getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context));
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+        return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtil.createTaskAttemptContext(context));
+    }
+
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+        getBaseOutputCommitter().setupJob(HCatMapRedUtil.createJobContext(context));
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext context) throws IOException {
+        getBaseOutputCommitter().setupTask(HCatMapRedUtil.createTaskAttemptContext(context));
+    }
+
+    @Override
+    public void abortJob(JobContext jobContext, State state) throws IOException {
+        getBaseOutputCommitter().abortJob(HCatMapRedUtil.createJobContext(jobContext), state);
+        cleanupJob(jobContext);
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+        getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext));
+        cleanupJob(jobContext);
+    }
+
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+        getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
+
+        //Cancel HCat and JobTracker tokens
+        HiveMetaStoreClient client = null;
+        try {
+            HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration());
+            client = HCatUtil.getHiveClient(hiveConf);
+            String tokenStrForm = client.getTokenStrForm();
+            if (tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+                client.cancelDelegationToken(tokenStrForm);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to cancel delegation token", e);
+        } finally {
+            HCatUtil.closeHiveClientQuietly(client);
+        }
+    }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+/**
+ * Bare bones implementation of OutputFormatContainer. Does only the required
+ * tasks to work properly with HCatalog. HCatalog features which require a
+ * storage specific implementation are unsupported (ie partitioning).
+ */
+class DefaultOutputFormatContainer extends OutputFormatContainer {
+
+    private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+
+    static {
+        NUMBER_FORMAT.setMinimumIntegerDigits(5);
+        NUMBER_FORMAT.setGroupingUsed(false);
+    }
+
+    public DefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<WritableComparable<?>, Writable> of) {
+        super(of);
+    }
+
+    static synchronized String getOutputName(int partition) {
+        return "part-" + NUMBER_FORMAT.format(partition);
+    }
+
+    /**
+     * Get the record writer for the job. Uses the storagehandler's OutputFormat
+     * to get the record writer.
+     * @param context the information about the current task.
+     * @return a RecordWriter to write the output for the job.
+     * @throws IOException
+     */
+    @Override
+    public RecordWriter<WritableComparable<?>, HCatRecord>
+    getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+        String name = getOutputName(context.getTaskAttemptID().getTaskID().getId());
+        return new DefaultRecordWriterContainer(context,
+            getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()), name, InternalUtil.createReporter(context)));
+    }
+
+
+    /**
+     * Get the output committer for this output format. This is responsible
+     * for ensuring the output is committed correctly.
+     * @param context the task context
+     * @return an output committer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+        throws IOException, InterruptedException {
+        return new DefaultOutputCommitterContainer(context, new JobConf(context.getConfiguration()).getOutputCommitter());
+    }
+
+    /**
+     * Check for validity of the output-specification for the job.
+     * @param context information about the job
+     * @throws IOException when output should not be attempted
+     */
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+        org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getBaseOutputFormat();
+        JobConf jobConf = new JobConf(context.getConfiguration());
+        outputFormat.checkOutputSpecs(null, jobConf);
+        HCatUtil.copyConf(jobConf, context.getConfiguration());
+    }
+
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+
+/**
+ * Part of the DefaultOutput*Container classes
+ * See {@link DefaultOutputFormatContainer} for more information
+ */
+class DefaultRecordWriterContainer extends RecordWriterContainer {
+
+    private final HCatStorageHandler storageHandler;
+    private final SerDe serDe;
+    private final OutputJobInfo jobInfo;
+    private final ObjectInspector hcatRecordOI;
+
+    /**
+     * @param context current JobContext
+     * @param baseRecordWriter RecordWriter to contain
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public DefaultRecordWriterContainer(TaskAttemptContext context,
+                                        org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter) throws IOException, InterruptedException {
+        super(context, baseRecordWriter);
+        jobInfo = HCatOutputFormat.getJobInfo(context);
+        storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
+        HCatOutputFormat.configureOutputStorageHandler(context);
+        serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration());
+        hcatRecordOI = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema());
+        try {
+            InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo);
+        } catch (SerDeException e) {
+            throw new IOException("Failed to initialize SerDe", e);
+        }
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+        getBaseRecordWriter().close(InternalUtil.createReporter(context));
+    }
+
+    @Override
+    public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
+        InterruptedException {
+        try {
+            getBaseRecordWriter().write(null, serDe.serialize(value.getAll(), hcatRecordOI));
+        } catch (SerDeException e) {
+            throw new IOException("Failed to serialize object", e);
+        }
+    }
+
+}