You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by av...@apache.org on 2012/08/03 23:14:22 UTC

svn commit: r1369248 - in /incubator/hcatalog/branches/branch-0.4: ./ src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/ src/test/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/pig/

Author: avandana
Date: Fri Aug  3 23:14:22 2012
New Revision: 1369248

URL: http://svn.apache.org/viewvc?rev=1369248&view=rev
Log:
HCAT-451 Partitions are created even when Jobs are aborted

Added:
    incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
Modified:
    incubator/hcatalog/branches/branch-0.4/CHANGES.txt
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
    incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
    incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java
    incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/pig/TestHCatStorer.java

Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Fri Aug  3 23:14:22 2012
@@ -51,6 +51,8 @@ Trunk (unreleased changes)
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-451 Partitions are created even when Jobs are aborted (avandana)
+
   HCAT-436 JSON SerDe column misnaming on CTAS (khorgath via gates)
 
   HCAT-449 HCatLoader is mistakenly identifying Configuration parameters to store (cdrome via traviscrawford)

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Fri Aug  3 23:14:22 2012
@@ -136,61 +136,7 @@ class FileOutputCommitterContainer exten
 
     @Override
     public void abortJob(JobContext jobContext, State state) throws IOException {
-        org.apache.hadoop.mapred.JobContext
-                mapRedJobContext = HCatMapRedUtil.createJobContext(jobContext);
-        if (dynamicPartitioningUsed){
-            discoverPartitions(jobContext);
-        }
-
-        if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-            getBaseOutputCommitter().abortJob(mapRedJobContext, state);
-        }
-        else if (dynamicPartitioningUsed){
-            for(JobContext currContext : contextDiscoveredByPath.values()){
-                try {
-                    new JobConf(currContext.getConfiguration()).getOutputCommitter().abortJob(currContext, state);
-                } catch (Exception e) {
-                    throw new IOException(e);
-                }
-            }
-        }
-
-        HiveMetaStoreClient client = null;
-        try {
-            HiveConf hiveConf = HCatUtil.getHiveConf(jobContext.getConfiguration());
-            client = HCatUtil.createHiveClient(hiveConf);
-            // cancel the deleg. tokens that were acquired for this job now that
-            // we are done - we should cancel if the tokens were acquired by
-            // HCatOutputFormat and not if they were supplied by Oozie.
-            // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in
-            // the conf will not be set
-            String tokenStrForm = client.getTokenStrForm();
-            if(tokenStrForm != null && jobContext.getConfiguration().get
-                    (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
-                client.cancelDelegationToken(tokenStrForm);
-            }
-        } catch(Exception e) {
-            if( e instanceof HCatException ) {
-                throw (HCatException) e;
-            } else {
-                throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
-            }
-        } finally {
-            HCatUtil.closeHiveClientQuietly(client);
-        }
-
-        Path src;
-        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
-        if (dynamicPartitioningUsed){
-            src = new Path(getPartitionRootLocation(
-                    jobInfo.getLocation().toString(),jobInfo.getTableInfo().getTable().getPartitionKeysSize()
-            ));
-        }else{
-            src = new Path(jobInfo.getLocation());
-        }
-        FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
-//      LOG.warn("abortJob about to delete ["+src.toString() +"]");
-        fs.delete(src, true);
+        internalAbortJob(jobContext, state);
     }
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
@@ -204,187 +150,44 @@ class FileOutputCommitterContainer exten
 
     @Override
     public void commitJob(JobContext jobContext) throws IOException {
-        if (dynamicPartitioningUsed){
-            discoverPartitions(jobContext);
-        }
-        if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-            getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext));
-        }
-        // create _SUCCESS FILE if so requested.
-        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
-        if(getOutputDirMarking(jobContext.getConfiguration())) {
-            Path outputPath = new Path(jobInfo.getLocation());
-            if (outputPath != null) {
-                FileSystem fileSys = outputPath.getFileSystem(jobContext.getConfiguration());
-                // create a file in the folder to mark it
-                if (fileSys.exists(outputPath)) {
-                    Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
-                    if(!fileSys.exists(filePath)) { // may have been created by baseCommitter.commitJob()
-                        fileSys.create(filePath).close();
-                    }
-                }
-            }
-        }
-        cleanupJob(jobContext);
-    }
-
-    @Override
-    public void cleanupJob(JobContext context) throws IOException {
-
-        if (dynamicPartitioningUsed){
-            discoverPartitions(context);
-        }
-
-
-        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
-        Configuration conf = context.getConfiguration();
-        Table table = jobInfo.getTableInfo().getTable();
-        Path tblPath = new Path(table.getSd().getLocation());
-        FileSystem fs = tblPath.getFileSystem(conf);
-
-        if( table.getPartitionKeys().size() == 0 ) {
-            //non partitioned table
-            if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-               getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
-            }
-            else if (dynamicPartitioningUsed){
-                for(JobContext currContext : contextDiscoveredByPath.values()){
-                    try {
-                        JobConf jobConf = new JobConf(currContext.getConfiguration());
-                        jobConf.getOutputCommitter().cleanupJob(currContext);
-                    } catch (Exception e) {
-                        throw new IOException(e);
-                    }
-                }
-            }
-
-            //Move data from temp directory the actual table directory
-            //No metastore operation required.
-            Path src = new Path(jobInfo.getLocation());
-            moveTaskOutputs(fs, src, src, tblPath, false);
-            fs.delete(src, true);
-            return;
-        }
-
-        HiveMetaStoreClient client = null;
-        HCatTableInfo tableInfo = jobInfo.getTableInfo();
-
-        List<Partition> partitionsAdded = new ArrayList<Partition>();
-
         try {
-            HiveConf hiveConf = HCatUtil.getHiveConf(conf);
-            client = HCatUtil.createHiveClient(hiveConf);
-
-            StorerInfo storer = InternalUtil.extractStorerInfo(table.getSd(),table.getParameters());
-
-            updateTableSchema(client, table, jobInfo.getOutputSchema());
-
-            FileStatus tblStat = fs.getFileStatus(tblPath);
-            String grpName = tblStat.getGroup();
-            FsPermission perms = tblStat.getPermission();
-
-            List<Partition> partitionsToAdd = new ArrayList<Partition>();
-            if (!dynamicPartitioningUsed){
-                partitionsToAdd.add(
-                        constructPartition(
-                                context,
-                                tblPath.toString(), jobInfo.getPartitionValues()
-                                ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
-                                ,table, fs
-                                ,grpName,perms));
-            }else{
-                for (Entry<String,Map<String,String>> entry : partitionsDiscoveredByPath.entrySet()){
-                    partitionsToAdd.add(
-                            constructPartition(
-                                    context,
-                                    getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue()
-                                    ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
-                                    ,table, fs
-                                    ,grpName,perms));
-                }
+            if (dynamicPartitioningUsed) {
+                discoverPartitions(jobContext);
             }
-
-            //Publish the new partition(s)
-            if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){
-
-                Path src = new Path(ptnRootLocation);
-
-                // check here for each dir we're copying out, to see if it already exists, error out if so
-                moveTaskOutputs(fs, src, src, tblPath,true);
-
-                moveTaskOutputs(fs, src, src, tblPath,false);
-                fs.delete(src, true);
-
-
-//          for (Partition partition : partitionsToAdd){
-//            partitionsAdded.add(client.add_partition(partition));
-//            // currently following add_partition instead of add_partitions because latter isn't
-//            // all-or-nothing and we want to be able to roll back partitions we added if need be.
-//          }
-
-                try {
-                    client.add_partitions(partitionsToAdd);
-                    partitionsAdded = partitionsToAdd;
-                } catch (Exception e){
-                    // There was an error adding partitions : rollback fs copy and rethrow
-                    for (Partition p : partitionsToAdd){
-                        Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
-                        if (fs.exists(ptnPath)){
-                            fs.delete(ptnPath,true);
+            if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+                getBaseOutputCommitter().commitJob(
+                        HCatMapRedUtil.createJobContext(jobContext));
+            }
+            registerPartitions(jobContext);
+            // create _SUCCESS FILE if so requested.
+            OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+            if (getOutputDirMarking(jobContext.getConfiguration())) {
+                Path outputPath = new Path(jobInfo.getLocation());
+                if (outputPath != null) {
+                    FileSystem fileSys = outputPath.getFileSystem(jobContext
+                            .getConfiguration());
+                    // create a file in the folder to mark it
+                    if (fileSys.exists(outputPath)) {
+                        Path filePath = new Path(outputPath,
+                                SUCCEEDED_FILE_NAME);
+                        if (!fileSys.exists(filePath)) { // may have been
+                                                         // created by
+                                                         // baseCommitter.commitJob()
+                            fileSys.create(filePath).close();
                         }
                     }
-                    throw e;
                 }
-
-            }else{
-                // no harProcessor, regular operation
-
-                // No duplicate partition publish case to worry about because we'll
-                // get a AlreadyExistsException here if so, and appropriately rollback
-
-                client.add_partitions(partitionsToAdd);
-                partitionsAdded = partitionsToAdd;
-
-                if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
-                    Path src = new Path(ptnRootLocation);
-                    moveTaskOutputs(fs, src, src, tblPath,false);
-                    fs.delete(src, true);
-                }
-
-            }
-
-            if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-                getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
-            }
-
-            if(Security.getInstance().isSecurityEnabled()) {
-                Security.getInstance().cancelToken(client, context);
-            }
-        } catch (Exception e) {
-
-            if( partitionsAdded.size() > 0 ) {
-                try {
-                    //baseCommitter.cleanupJob failed, try to clean up the metastore
-                    for (Partition p : partitionsAdded){
-                        client.dropPartition(tableInfo.getDatabaseName(),
-                                tableInfo.getTableName(), p.getValues());
-                    }
-                } catch(Exception te) {
-                    //Keep cause as the original exception
-                    throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
-                }
-            }
-
-            if( e instanceof HCatException ) {
-                throw (HCatException) e;
-            } else {
-                throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
             }
         } finally {
-            HCatUtil.closeHiveClientQuietly(client);
+            cancelDelegationTokens(jobContext);
         }
     }
 
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+        internalAbortJob(context, State.FAILED);
+    }
+
     private String getPartitionRootLocation(String ptnLocn,int numPtnKeys) {
         if (ptnRootLocation  == null){
             // we only need to calculate it once, it'll be the same for other partitions in this job.
@@ -542,6 +345,7 @@ class FileOutputCommitterContainer exten
             //Update table schema to add the newly added columns
             table.getSd().setCols(tableColumns);
             client.alter_table(table.getDbName(), table.getTableName(), table);
+            LOG.info("The columns {} have been added to the table {}.", newColumns, table.getTableName());
         }
     }
 
@@ -681,4 +485,194 @@ class FileOutputCommitterContainer exten
         }
     }
 
+    private void registerPartitions(JobContext context) throws IOException{
+        if (dynamicPartitioningUsed){
+            discoverPartitions(context);
+        }
+        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+        Configuration conf = context.getConfiguration();
+        Table table = jobInfo.getTableInfo().getTable();
+        Path tblPath = new Path(table.getSd().getLocation());
+        FileSystem fs = tblPath.getFileSystem(conf);
+
+        if( table.getPartitionKeys().size() == 0 ) {
+            //Move data from temp directory the actual table directory
+            //No metastore operation required.
+            Path src = new Path(jobInfo.getLocation());
+            moveTaskOutputs(fs, src, src, tblPath, false);
+            fs.delete(src, true);
+            return;
+        }
+
+        HiveMetaStoreClient client = null;
+        HCatTableInfo tableInfo = jobInfo.getTableInfo();
+        List<Partition> partitionsAdded = new ArrayList<Partition>();
+        try {
+            HiveConf hiveConf = HCatUtil.getHiveConf(conf);
+            client = HCatUtil.createHiveClient(hiveConf);
+            StorerInfo storer = InternalUtil.extractStorerInfo(table.getSd(),table.getParameters());
+
+            FileStatus tblStat = fs.getFileStatus(tblPath);
+            String grpName = tblStat.getGroup();
+            FsPermission perms = tblStat.getPermission();
+
+            List<Partition> partitionsToAdd = new ArrayList<Partition>();
+            if (!dynamicPartitioningUsed){
+                partitionsToAdd.add(
+                        constructPartition(
+                                context,
+                                tblPath.toString(), jobInfo.getPartitionValues()
+                                ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+                                ,table, fs
+                                ,grpName,perms));
+            }else{
+                for (Entry<String,Map<String,String>> entry : partitionsDiscoveredByPath.entrySet()){
+                    partitionsToAdd.add(
+                            constructPartition(
+                                    context,
+                                    getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue()
+                                    ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+                                    ,table, fs
+                                    ,grpName,perms));
+                }
+            }
+
+            ArrayList<Map<String,String>> ptnInfos = new ArrayList<Map<String,String>>();
+            for(Partition ptn : partitionsToAdd){
+               ptnInfos.add(InternalUtil.createPtnKeyValueMap(tableInfo.getTable(), ptn));
+            }
+
+            //Publish the new partition(s)
+            if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){
+
+                Path src = new Path(ptnRootLocation);
+                // check here for each dir we're copying out, to see if it
+                // already exists, error out if so
+                moveTaskOutputs(fs, src, src, tblPath,true);
+                moveTaskOutputs(fs, src, src, tblPath,false);
+                fs.delete(src, true);
+                try {
+                    updateTableSchema(client, table, jobInfo.getOutputSchema());
+                    LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
+                    client.add_partitions(partitionsToAdd);
+                    partitionsAdded = partitionsToAdd;
+                } catch (Exception e){
+                    // There was an error adding partitions : rollback fs copy and rethrow
+                    for (Partition p : partitionsToAdd){
+                        Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
+                        if (fs.exists(ptnPath)){
+                            fs.delete(ptnPath,true);
+                        }
+                    }
+                    throw e;
+                }
+
+            }else{
+                // no harProcessor, regular operation
+                // No duplicate partition publish case to worry about because we'll
+                // get a AlreadyExistsException here if so, and appropriately rollback
+                updateTableSchema(client, table, jobInfo.getOutputSchema());
+                LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
+                client.add_partitions(partitionsToAdd);
+                partitionsAdded = partitionsToAdd;
+                if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
+                    Path src = new Path(ptnRootLocation);
+                    moveTaskOutputs(fs, src, src, tblPath,false);
+                    fs.delete(src, true);
+                }
+            }
+        } catch (Exception e) {
+          if( partitionsAdded.size() > 0 ) {
+                try {
+                    //baseCommitter.cleanupJob failed, try to clean up the metastore
+                    for (Partition p : partitionsAdded){
+                        client.dropPartition(tableInfo.getDatabaseName(),
+                                tableInfo.getTableName(), p.getValues());
+                    }
+                } catch(Exception te) {
+                    //Keep cause as the original exception
+                    throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+                }
+            }
+            if( e instanceof HCatException ) {
+                throw (HCatException) e;
+            } else {
+                throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+            }
+        } finally {
+            HCatUtil.closeHiveClientQuietly(client);
+        }
+    }
+
+    /**
+     * This method exists to ensure unit tests run with Pig 0.8 and
+     * 0.9 versions. The cleanupJob method is deprecated but, Pig 0.8 and
+     * 0.9 call cleanupJob method. Hence this method is used by both abortJob
+     * and cleanupJob methods.
+     * @param JobContext The job context.
+     * @throws java.io.IOException
+     */
+    private void internalAbortJob(JobContext context, State state) throws IOException{
+        try {
+            if (dynamicPartitioningUsed) {
+                discoverPartitions(context);
+            }
+            org.apache.hadoop.mapred.JobContext mapRedJobContext = HCatMapRedUtil
+                    .createJobContext(context);
+            if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+                getBaseOutputCommitter().abortJob(mapRedJobContext, state);
+            } else if (dynamicPartitioningUsed) {
+                for (JobContext currContext : contextDiscoveredByPath.values()) {
+                    try {
+                        new JobConf(currContext.getConfiguration())
+                                .getOutputCommitter().abortJob(currContext,
+                                        state);
+                    } catch (Exception e) {
+                        throw new IOException(e);
+                    }
+                }
+            }
+            Path src;
+            OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+            if (dynamicPartitioningUsed) {
+                src = new Path(getPartitionRootLocation(jobInfo.getLocation()
+                        .toString(), jobInfo.getTableInfo().getTable()
+                        .getPartitionKeysSize()));
+            } else {
+                src = new Path(jobInfo.getLocation());
+            }
+            FileSystem fs = src.getFileSystem(context.getConfiguration());
+            LOG.info("Job failed. Cleaning up temporary directory [{}].", src);
+            fs.delete(src, true);
+        } finally {
+            cancelDelegationTokens(context);
+        }
+    }
+
+    private void cancelDelegationTokens(JobContext context) throws IOException{
+        LOG.info("Cancelling deletgation token for the job.");
+        HiveMetaStoreClient client = null;
+        try {
+            HiveConf hiveConf = HCatUtil
+                    .getHiveConf(context.getConfiguration());
+            client = HCatUtil.createHiveClient(hiveConf);
+            // cancel the deleg. tokens that were acquired for this job now that
+            // we are done - we should cancel if the tokens were acquired by
+            // HCatOutputFormat and not if they were supplied by Oozie.
+            // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in
+            // the conf will not be set
+            String tokenStrForm = client.getTokenStrForm();
+            if (tokenStrForm != null
+                    && context.getConfiguration().get(
+                            HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+                client.cancelDelegationToken(tokenStrForm);
+            }
+        } catch (MetaException e) {
+            LOG.warn("MetaException while cancelling delegation token.",e );
+        } catch (TException e) {
+            LOG.warn("TException while cancelling delegation token.", e);
+        } finally {
+            HCatUtil.closeHiveClientQuietly(client);
+        }
+    }
 }

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java Fri Aug  3 23:14:22 2012
@@ -27,7 +27,6 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -103,7 +102,7 @@ public class InitializeInput {
           PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(),
                                               job.getConfiguration(),
                                               inputJobInfo);
-          partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn));
+          partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table, ptn));
           partInfoList.add(partInfo);
         }
 
@@ -124,27 +123,6 @@ public class InitializeInput {
 
   }
 
-  private static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn) throws IOException{
-    List<String> values = ptn.getValues();
-    if( values.size() != table.getPartitionKeys().size() ) {
-      throw new IOException("Partition values in partition inconsistent with table definition, table "
-          + table.getTableName() + " has "
-          + table.getPartitionKeys().size()
-          + " partition keys, partition has " + values.size() + "partition values" );
-    }
-
-    Map<String,String> ptnKeyValues = new HashMap<String,String>();
-
-    int i = 0;
-    for(FieldSchema schema : table.getPartitionKeys()) {
-      // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues()
-      ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
-      i++;
-    }
-
-    return ptnKeyValues;
-  }
-
   static PartInfo extractPartInfo(StorageDescriptor sd,
       Map<String,String> parameters, Configuration conf,
       InputJobInfo inputJobInfo) throws IOException{

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java Fri Aug  3 23:14:22 2012
@@ -21,7 +21,9 @@ package org.apache.hcatalog.mapreduce;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -45,6 +47,7 @@ import org.apache.hcatalog.data.schema.H
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -56,7 +59,7 @@ class InternalUtil {
         for (String key : properties.keySet()){
             hcatProperties.put(key, properties.get(key));
         }
-        
+
         // also populate with StorageDescriptor->SerDe.Parameters
         for (Map.Entry<String, String>param :
             sd.getSerdeInfo().getParameters().entrySet()) {
@@ -132,20 +135,20 @@ class InternalUtil {
 
   //TODO this has to find a better home, it's also hardcoded as default in hive would be nice
   // if the default was decided by the serde
-  static void initializeOutputSerDe(SerDe serDe, Configuration conf, 
-                                    OutputJobInfo jobInfo) 
+  static void initializeOutputSerDe(SerDe serDe, Configuration conf,
+                                    OutputJobInfo jobInfo)
   throws SerDeException {
-    initializeSerDe(serDe, conf, jobInfo.getTableInfo(), 
-                    jobInfo.getOutputSchema()); 
+    initializeSerDe(serDe, conf, jobInfo.getTableInfo(),
+                    jobInfo.getOutputSchema());
   }
 
-  static void initializeInputSerDe(SerDe serDe, Configuration conf, 
+  static void initializeInputSerDe(SerDe serDe, Configuration conf,
                                    HCatTableInfo info, HCatSchema s)
   throws SerDeException {
-    initializeSerDe(serDe, conf, info, s); 
+    initializeSerDe(serDe, conf, info, s);
   }
 
-  static void initializeSerDe(SerDe serDe, Configuration conf, 
+  static void initializeSerDe(SerDe serDe, Configuration conf,
                               HCatTableInfo info, HCatSchema s)
   throws SerDeException {
      Properties props = new Properties();
@@ -183,4 +186,25 @@ static Reporter createReporter(TaskAttem
           + " but found " + split.getClass().getName());
     }
   }
+
+  static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn) throws IOException{
+      List<String> values = ptn.getValues();
+      if( values.size() != table.getPartitionKeys().size() ) {
+        throw new IOException("Partition values in partition inconsistent with table definition, table "
+            + table.getTableName() + " has "
+            + table.getPartitionKeys().size()
+            + " partition keys, partition has " + values.size() + "partition values" );
+      }
+
+      Map<String,String> ptnKeyValues = new HashMap<String,String>();
+
+      int i = 0;
+      for(FieldSchema schema : table.getPartitionKeys()) {
+        // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues()
+        ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
+        i++;
+      }
+
+      return ptnKeyValues;
+    }
 }

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java Fri Aug  3 23:14:22 2012
@@ -27,6 +27,7 @@ import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.security.Credentials;
@@ -156,7 +157,7 @@ public class HCatStorer extends HCatBase
       //Calling it from here so that the partition publish happens.
       //This call needs to be removed after MAPREDUCE-1447 is fixed.
         getOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext(
-        		job.getConfiguration(), new TaskAttemptID())).cleanupJob(job);
+        		job.getConfiguration(), new TaskAttemptID())).commitJob(job);
       } catch (IOException e) {
         throw new IOException("Failed to cleanup job",e);
       } catch (InterruptedException e) {
@@ -164,4 +165,23 @@ public class HCatStorer extends HCatBase
       }
     }
   }
+
+   @Override
+    public void cleanupOnFailure(String location, Job job) throws IOException {
+        if (job.getConfiguration().get("mapred.job.tracker", "")
+                .equalsIgnoreCase("local")) {
+            try {
+                // This call needs to be removed after MAPREDUCE-1447 is fixed.
+                getOutputFormat().getOutputCommitter(
+                        HCatHadoopShims.Instance.get()
+                                .createTaskAttemptContext(
+                                        job.getConfiguration(),
+                                        new TaskAttemptID())).abortJob(job, State.FAILED);
+            } catch (IOException e) {
+                throw new IOException("Failed to abort job", e);
+            } catch (InterruptedException e) {
+                throw new IOException("Failed to abort job", e);
+            }
+        }
+    }
 }

Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java Fri Aug  3 23:14:22 2012
@@ -154,7 +154,7 @@ public class TestHCatOutputFormat extend
 
   public void publishTest(Job job) throws Exception {
     OutputCommitter committer = new FileOutputCommitterContainer(job,null);
-    committer.cleanupJob(job);
+    committer.commitJob(job);
 
     Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));
     assertNotNull(part);

Added: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java?rev=1369248&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java Fri Aug  3 23:14:22 2012
@@ -0,0 +1,236 @@
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.NoExitSecurityManager;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHCatPartitionPublish {
+    private static File workDir;
+    private static Configuration mrConf = null;
+    private static FileSystem fs = null;
+    private static MiniMRCluster mrCluster = null;
+    private static boolean isServerRunning = false;
+    private static final int msPort = 20101;
+    private static HiveConf hcatConf;
+    private static HiveMetaStoreClient msc;
+    private static SecurityManager securityManager;
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        createWorkDir();
+        Configuration conf = new Configuration(true);
+        fs = FileSystem.get(conf);
+        System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath());
+        // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+        // to use MiniMRCluster. MAPREDUCE-2350
+        mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+                new JobConf(conf));
+        mrConf = mrCluster.createJobConf();
+
+        if(isServerRunning) {
+            return;
+          }
+
+          MetaStoreUtils.startMetaStore(msPort, ShimLoader.getHadoopThriftAuthBridge());
+          isServerRunning = true;
+          securityManager = System.getSecurityManager();
+          System.setSecurityManager(new NoExitSecurityManager());
+
+          hcatConf = new HiveConf(TestHCatPartitionPublish.class);
+          hcatConf.set("hive.metastore.local", "false");
+          hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort);
+          hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3);
+
+          hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName());
+          hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+          hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+          hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+          msc = new HiveMetaStoreClient(hcatConf,null);
+          System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+          System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+    }
+
+    private static void createWorkDir() throws IOException {
+        String testDir = System.getProperty("test.data.dir", "./build/test");
+        testDir = testDir + "/test_hcat_test_ptn" + Math.abs(new Random().nextLong()) + "/";
+        workDir = new File(new File(testDir).getCanonicalPath());
+        FileUtil.fullyDelete(workDir);
+        workDir.mkdirs();
+    }
+
+    @AfterClass
+    public  static void tearDown() throws IOException {
+        if (mrCluster != null) {
+            mrCluster.shutdown();
+        }
+        FileUtil.fullyDelete(workDir);
+        System.setSecurityManager(securityManager);
+        isServerRunning = false;
+    }
+
+    @Test
+    public void testPartitionPublish() throws Exception {
+        String dbName = "default";
+        String tableName = "testHCatPartitionedTable";
+        createTable(null,tableName);
+
+        Map<String, String> partitionMap = new HashMap<String, String>();
+        partitionMap.put("part1", "p1value1");
+        partitionMap.put("part0", "p0value1");
+
+        ArrayList<HCatFieldSchema> hcatTableColumns = new ArrayList<HCatFieldSchema>();
+        for(FieldSchema fs : getTableColumns()){
+            hcatTableColumns.add(HCatSchemaUtils.getHCatFieldSchema(fs));
+        }
+
+        runMRCreateFail(dbName, tableName, partitionMap, hcatTableColumns);
+        List<String> ptns = msc.listPartitionNames(dbName, tableName, (short)10);
+        Assert.assertEquals(0, ptns.size());
+        Table table = msc.getTable(dbName, tableName);
+        Assert.assertTrue(table != null);
+        // Also make sure that the directory has been deleted in the table location.
+        Assert.assertFalse(fs.exists(new Path(table.getSd().getLocation() + "/part1=p1value1/part0=p0value1")));
+    }
+
+    void runMRCreateFail(String dbName, String tableName, Map<String, String> partitionValues,
+            List<HCatFieldSchema> columns) throws Exception {
+
+        Job job = new Job(mrConf, "hcat mapreduce write fail test");
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(TestHCatPartitionPublish.MapFail.class);
+
+        // input/output settings
+        job.setInputFormatClass(TextInputFormat.class);
+
+        Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
+        //The write count does not matter, as the map will fail in its first call.
+        createInputFile(path, 5);
+
+        TextInputFormat.setInputPaths(job, path);
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues);
+        HCatOutputFormat.setOutput(job, outputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(DefaultHCatRecord.class);
+
+        job.setNumReduceTasks(0);
+
+        HCatOutputFormat.setSchema(job, new HCatSchema(columns));
+
+        boolean success = job.waitForCompletion(true);
+        Assert.assertTrue(success == false);
+      }
+
+    private void createInputFile(Path path, int rowCount) throws IOException {
+        if (fs.exists(path)) {
+            fs.delete(path, true);
+        }
+        FSDataOutputStream os = fs.create(path);
+        for (int i = 0; i < rowCount; i++) {
+            os.writeChars(i + "\n");
+        }
+        os.close();
+    }
+
+    public static class MapFail extends
+    Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+      @Override
+      public void map(LongWritable key, Text value, Context context
+      ) throws IOException, InterruptedException {
+        {
+          throw new IOException("Exception to mimic job failure.");
+        }
+      }
+    }
+
+    private void createTable(String dbName,String tableName) throws Exception{
+        String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+        try {
+          msc.dropTable(databaseName, tableName);
+        } catch(Exception e) {
+        } //can fail with NoSuchObjectException
+
+        Table tbl = new Table();
+        tbl.setDbName(databaseName);
+        tbl.setTableName(tableName);
+        tbl.setTableType("MANAGED_TABLE");
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setCols(getTableColumns());
+        tbl.setPartitionKeys(getPartitionKeys());
+        tbl.setSd(sd);
+        sd.setBucketCols(new ArrayList<String>(2));
+        sd.setSerdeInfo(new SerDeInfo());
+        sd.getSerdeInfo().setName(tbl.getTableName());
+        sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+        sd.getSerdeInfo().getParameters().put(
+            org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+        sd.getSerdeInfo().setSerializationLib(ColumnarSerDe.class.getName());
+        sd.setInputFormat(RCFileInputFormat.class.getName());
+        sd.setOutputFormat(RCFileOutputFormat.class.getName());
+
+        Map<String, String> tableParams = new HashMap<String, String>();
+        tbl.setParameters(tableParams);
+
+        msc.createTable(tbl);
+    }
+
+    protected List<FieldSchema> getPartitionKeys() {
+      List<FieldSchema> fields = new ArrayList<FieldSchema>();
+      //Defining partition names in unsorted order
+      fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, ""));
+      fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, ""));
+      return fields;
+    }
+
+    protected List<FieldSchema> getTableColumns() {
+      List<FieldSchema> fields = new ArrayList<FieldSchema>();
+      fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+      fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+      return fields;
+    }
+
+}

Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java Fri Aug  3 23:14:22 2012
@@ -167,7 +167,7 @@ public class TestSequenceFileReadWrite {
         HCatOutputFormat.setSchema(job, getSchema());
         job.setNumReduceTasks(0);
         assertTrue(job.waitForCompletion(true));
-        new FileOutputCommitterContainer(job, null).cleanupJob(job);
+        new FileOutputCommitterContainer(job, null).commitJob(job);
         assertTrue(job.isSuccessful());
 
         server.setBatchOn();
@@ -211,7 +211,7 @@ public class TestSequenceFileReadWrite {
         job.setOutputFormatClass(HCatOutputFormat.class);
         HCatOutputFormat.setSchema(job, getSchema());
         assertTrue(job.waitForCompletion(true));
-        new FileOutputCommitterContainer(job, null).cleanupJob(job);
+        new FileOutputCommitterContainer(job, null).commitJob(job);
         assertTrue(job.isSuccessful());
 
         server.setBatchOn();
@@ -254,4 +254,4 @@ public class TestSequenceFileReadWrite {
       return schema;
   }
 
-}
\ No newline at end of file
+}

Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/pig/TestHCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/pig/TestHCatStorer.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/pig/TestHCatStorer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/pig/TestHCatStorer.java Fri Aug  3 23:14:22 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hcatalog.pig;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hive.ql.Command
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hcatalog.HcatTestUtils;
+import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
@@ -37,6 +39,7 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.LogUtils;
+import org.junit.Assert;
 
 public class TestHCatStorer extends TestCase {
   private static final String TEST_DATA_DIR = System.getProperty("user.dir") +
@@ -639,4 +642,57 @@ public class TestHCatStorer extends Test
       assertEquals(0, results.size());
       driver.run("drop table employee");
     }
+
+    public void testPartitionPublish() throws IOException, CommandNeedRetryException{
+
+        driver.run("drop table ptn_fail");
+        String createTable = "create table ptn_fail(a int, c string) partitioned by (b string) stored as RCFILE";
+        int retCode = driver.run(createTable).getResponseCode();
+        if(retCode != 0) {
+          throw new IOException("Failed to create table.");
+        }
+        int LOOP_SIZE = 11;
+        String[] input = new String[LOOP_SIZE];
+
+        for(int i = 0; i < LOOP_SIZE; i++) {
+            input[i] = i + "\tmath";
+        }
+        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
+        PigServer server = new PigServer(ExecType.LOCAL);
+        server.setBatchOn();
+        server.registerQuery("A = load '"+ INPUT_FILE_NAME+"' as (a:int, c:chararray);");
+        server.registerQuery("B = filter A by " + FailEvalFunc.class.getName() + "($0);");
+        server.registerQuery("store B into 'ptn_fail' using "+HCatStorer.class.getName()+"('b=math');");
+        server.executeBatch();
+
+        String query = "show partitions ptn_fail";
+        retCode = driver.run(query).getResponseCode();
+
+        if( retCode != 0 ) {
+          throw new IOException("Error " + retCode + " running query " + query);
+        }
+
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        assertEquals(0, res.size());
+
+        //Make sure the partitions directory is not in hdfs.
+        Assert.assertTrue((new File(TEST_WAREHOUSE_DIR + "/ptn_fail")).exists());
+        Assert.assertFalse((new File(TEST_WAREHOUSE_DIR + "/ptn_fail/b=math")).exists());
+      }
+
+    static public class FailEvalFunc extends EvalFunc<Boolean> {
+
+        /* @param Tuple
+        /* @return null
+        /* @throws IOException
+         * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+         */
+        @Override
+        public Boolean exec(Tuple tuple) throws IOException {
+            throw new IOException("Eval Func to mimic Failure.");
+        }
+
+    }
+
 }