You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by av...@apache.org on 2012/08/03 23:14:22 UTC
svn commit: r1369248 - in /incubator/hcatalog/branches/branch-0.4: ./
src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/
src/test/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/pig/
Author: avandana
Date: Fri Aug 3 23:14:22 2012
New Revision: 1369248
URL: http://svn.apache.org/viewvc?rev=1369248&view=rev
Log:
HCAT-451 Partitions are created even when Jobs are aborted
Added:
incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
Modified:
incubator/hcatalog/branches/branch-0.4/CHANGES.txt
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java
incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/pig/TestHCatStorer.java
Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Fri Aug 3 23:14:22 2012
@@ -51,6 +51,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT-451 Partitions are created even when Jobs are aborted (avandana)
+
HCAT-436 JSON SerDe column misnaming on CTAS (khorgath via gates)
HCAT-449 HCatLoader is mistakenly identifying Configuration parameters to store (cdrome via traviscrawford)
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Fri Aug 3 23:14:22 2012
@@ -136,61 +136,7 @@ class FileOutputCommitterContainer exten
@Override
public void abortJob(JobContext jobContext, State state) throws IOException {
- org.apache.hadoop.mapred.JobContext
- mapRedJobContext = HCatMapRedUtil.createJobContext(jobContext);
- if (dynamicPartitioningUsed){
- discoverPartitions(jobContext);
- }
-
- if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
- getBaseOutputCommitter().abortJob(mapRedJobContext, state);
- }
- else if (dynamicPartitioningUsed){
- for(JobContext currContext : contextDiscoveredByPath.values()){
- try {
- new JobConf(currContext.getConfiguration()).getOutputCommitter().abortJob(currContext, state);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- }
-
- HiveMetaStoreClient client = null;
- try {
- HiveConf hiveConf = HCatUtil.getHiveConf(jobContext.getConfiguration());
- client = HCatUtil.createHiveClient(hiveConf);
- // cancel the deleg. tokens that were acquired for this job now that
- // we are done - we should cancel if the tokens were acquired by
- // HCatOutputFormat and not if they were supplied by Oozie.
- // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in
- // the conf will not be set
- String tokenStrForm = client.getTokenStrForm();
- if(tokenStrForm != null && jobContext.getConfiguration().get
- (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
- client.cancelDelegationToken(tokenStrForm);
- }
- } catch(Exception e) {
- if( e instanceof HCatException ) {
- throw (HCatException) e;
- } else {
- throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
- }
- } finally {
- HCatUtil.closeHiveClientQuietly(client);
- }
-
- Path src;
- OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
- if (dynamicPartitioningUsed){
- src = new Path(getPartitionRootLocation(
- jobInfo.getLocation().toString(),jobInfo.getTableInfo().getTable().getPartitionKeysSize()
- ));
- }else{
- src = new Path(jobInfo.getLocation());
- }
- FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
-// LOG.warn("abortJob about to delete ["+src.toString() +"]");
- fs.delete(src, true);
+ internalAbortJob(jobContext, state);
}
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
@@ -204,187 +150,44 @@ class FileOutputCommitterContainer exten
@Override
public void commitJob(JobContext jobContext) throws IOException {
- if (dynamicPartitioningUsed){
- discoverPartitions(jobContext);
- }
- if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
- getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext));
- }
- // create _SUCCESS FILE if so requested.
- OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
- if(getOutputDirMarking(jobContext.getConfiguration())) {
- Path outputPath = new Path(jobInfo.getLocation());
- if (outputPath != null) {
- FileSystem fileSys = outputPath.getFileSystem(jobContext.getConfiguration());
- // create a file in the folder to mark it
- if (fileSys.exists(outputPath)) {
- Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
- if(!fileSys.exists(filePath)) { // may have been created by baseCommitter.commitJob()
- fileSys.create(filePath).close();
- }
- }
- }
- }
- cleanupJob(jobContext);
- }
-
- @Override
- public void cleanupJob(JobContext context) throws IOException {
-
- if (dynamicPartitioningUsed){
- discoverPartitions(context);
- }
-
-
- OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
- Configuration conf = context.getConfiguration();
- Table table = jobInfo.getTableInfo().getTable();
- Path tblPath = new Path(table.getSd().getLocation());
- FileSystem fs = tblPath.getFileSystem(conf);
-
- if( table.getPartitionKeys().size() == 0 ) {
- //non partitioned table
- if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
- getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
- }
- else if (dynamicPartitioningUsed){
- for(JobContext currContext : contextDiscoveredByPath.values()){
- try {
- JobConf jobConf = new JobConf(currContext.getConfiguration());
- jobConf.getOutputCommitter().cleanupJob(currContext);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- }
-
- //Move data from temp directory the actual table directory
- //No metastore operation required.
- Path src = new Path(jobInfo.getLocation());
- moveTaskOutputs(fs, src, src, tblPath, false);
- fs.delete(src, true);
- return;
- }
-
- HiveMetaStoreClient client = null;
- HCatTableInfo tableInfo = jobInfo.getTableInfo();
-
- List<Partition> partitionsAdded = new ArrayList<Partition>();
-
try {
- HiveConf hiveConf = HCatUtil.getHiveConf(conf);
- client = HCatUtil.createHiveClient(hiveConf);
-
- StorerInfo storer = InternalUtil.extractStorerInfo(table.getSd(),table.getParameters());
-
- updateTableSchema(client, table, jobInfo.getOutputSchema());
-
- FileStatus tblStat = fs.getFileStatus(tblPath);
- String grpName = tblStat.getGroup();
- FsPermission perms = tblStat.getPermission();
-
- List<Partition> partitionsToAdd = new ArrayList<Partition>();
- if (!dynamicPartitioningUsed){
- partitionsToAdd.add(
- constructPartition(
- context,
- tblPath.toString(), jobInfo.getPartitionValues()
- ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
- ,table, fs
- ,grpName,perms));
- }else{
- for (Entry<String,Map<String,String>> entry : partitionsDiscoveredByPath.entrySet()){
- partitionsToAdd.add(
- constructPartition(
- context,
- getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue()
- ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
- ,table, fs
- ,grpName,perms));
- }
+ if (dynamicPartitioningUsed) {
+ discoverPartitions(jobContext);
}
-
- //Publish the new partition(s)
- if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){
-
- Path src = new Path(ptnRootLocation);
-
- // check here for each dir we're copying out, to see if it already exists, error out if so
- moveTaskOutputs(fs, src, src, tblPath,true);
-
- moveTaskOutputs(fs, src, src, tblPath,false);
- fs.delete(src, true);
-
-
-// for (Partition partition : partitionsToAdd){
-// partitionsAdded.add(client.add_partition(partition));
-// // currently following add_partition instead of add_partitions because latter isn't
-// // all-or-nothing and we want to be able to roll back partitions we added if need be.
-// }
-
- try {
- client.add_partitions(partitionsToAdd);
- partitionsAdded = partitionsToAdd;
- } catch (Exception e){
- // There was an error adding partitions : rollback fs copy and rethrow
- for (Partition p : partitionsToAdd){
- Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
- if (fs.exists(ptnPath)){
- fs.delete(ptnPath,true);
+ if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+ getBaseOutputCommitter().commitJob(
+ HCatMapRedUtil.createJobContext(jobContext));
+ }
+ registerPartitions(jobContext);
+ // create _SUCCESS FILE if so requested.
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+ if (getOutputDirMarking(jobContext.getConfiguration())) {
+ Path outputPath = new Path(jobInfo.getLocation());
+ if (outputPath != null) {
+ FileSystem fileSys = outputPath.getFileSystem(jobContext
+ .getConfiguration());
+ // create a file in the folder to mark it
+ if (fileSys.exists(outputPath)) {
+ Path filePath = new Path(outputPath,
+ SUCCEEDED_FILE_NAME);
+ if (!fileSys.exists(filePath)) { // may have been
+ // created by
+ // baseCommitter.commitJob()
+ fileSys.create(filePath).close();
}
}
- throw e;
}
-
- }else{
- // no harProcessor, regular operation
-
- // No duplicate partition publish case to worry about because we'll
- // get a AlreadyExistsException here if so, and appropriately rollback
-
- client.add_partitions(partitionsToAdd);
- partitionsAdded = partitionsToAdd;
-
- if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
- Path src = new Path(ptnRootLocation);
- moveTaskOutputs(fs, src, src, tblPath,false);
- fs.delete(src, true);
- }
-
- }
-
- if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
- getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
- }
-
- if(Security.getInstance().isSecurityEnabled()) {
- Security.getInstance().cancelToken(client, context);
- }
- } catch (Exception e) {
-
- if( partitionsAdded.size() > 0 ) {
- try {
- //baseCommitter.cleanupJob failed, try to clean up the metastore
- for (Partition p : partitionsAdded){
- client.dropPartition(tableInfo.getDatabaseName(),
- tableInfo.getTableName(), p.getValues());
- }
- } catch(Exception te) {
- //Keep cause as the original exception
- throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
- }
- }
-
- if( e instanceof HCatException ) {
- throw (HCatException) e;
- } else {
- throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
}
} finally {
- HCatUtil.closeHiveClientQuietly(client);
+ cancelDelegationTokens(jobContext);
}
}
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+ internalAbortJob(context, State.FAILED);
+ }
+
private String getPartitionRootLocation(String ptnLocn,int numPtnKeys) {
if (ptnRootLocation == null){
// we only need to calculate it once, it'll be the same for other partitions in this job.
@@ -542,6 +345,7 @@ class FileOutputCommitterContainer exten
//Update table schema to add the newly added columns
table.getSd().setCols(tableColumns);
client.alter_table(table.getDbName(), table.getTableName(), table);
+ LOG.info("The columns {} have been added to the table {}.", newColumns, table.getTableName());
}
}
@@ -681,4 +485,194 @@ class FileOutputCommitterContainer exten
}
}
+ private void registerPartitions(JobContext context) throws IOException{
+ if (dynamicPartitioningUsed){
+ discoverPartitions(context);
+ }
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+ Configuration conf = context.getConfiguration();
+ Table table = jobInfo.getTableInfo().getTable();
+ Path tblPath = new Path(table.getSd().getLocation());
+ FileSystem fs = tblPath.getFileSystem(conf);
+
+ if( table.getPartitionKeys().size() == 0 ) {
+ //Move data from temp directory the actual table directory
+ //No metastore operation required.
+ Path src = new Path(jobInfo.getLocation());
+ moveTaskOutputs(fs, src, src, tblPath, false);
+ fs.delete(src, true);
+ return;
+ }
+
+ HiveMetaStoreClient client = null;
+ HCatTableInfo tableInfo = jobInfo.getTableInfo();
+ List<Partition> partitionsAdded = new ArrayList<Partition>();
+ try {
+ HiveConf hiveConf = HCatUtil.getHiveConf(conf);
+ client = HCatUtil.createHiveClient(hiveConf);
+ StorerInfo storer = InternalUtil.extractStorerInfo(table.getSd(),table.getParameters());
+
+ FileStatus tblStat = fs.getFileStatus(tblPath);
+ String grpName = tblStat.getGroup();
+ FsPermission perms = tblStat.getPermission();
+
+ List<Partition> partitionsToAdd = new ArrayList<Partition>();
+ if (!dynamicPartitioningUsed){
+ partitionsToAdd.add(
+ constructPartition(
+ context,
+ tblPath.toString(), jobInfo.getPartitionValues()
+ ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+ ,table, fs
+ ,grpName,perms));
+ }else{
+ for (Entry<String,Map<String,String>> entry : partitionsDiscoveredByPath.entrySet()){
+ partitionsToAdd.add(
+ constructPartition(
+ context,
+ getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue()
+ ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+ ,table, fs
+ ,grpName,perms));
+ }
+ }
+
+ ArrayList<Map<String,String>> ptnInfos = new ArrayList<Map<String,String>>();
+ for(Partition ptn : partitionsToAdd){
+ ptnInfos.add(InternalUtil.createPtnKeyValueMap(tableInfo.getTable(), ptn));
+ }
+
+ //Publish the new partition(s)
+ if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){
+
+ Path src = new Path(ptnRootLocation);
+ // check here for each dir we're copying out, to see if it
+ // already exists, error out if so
+ moveTaskOutputs(fs, src, src, tblPath,true);
+ moveTaskOutputs(fs, src, src, tblPath,false);
+ fs.delete(src, true);
+ try {
+ updateTableSchema(client, table, jobInfo.getOutputSchema());
+ LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
+ client.add_partitions(partitionsToAdd);
+ partitionsAdded = partitionsToAdd;
+ } catch (Exception e){
+ // There was an error adding partitions : rollback fs copy and rethrow
+ for (Partition p : partitionsToAdd){
+ Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
+ if (fs.exists(ptnPath)){
+ fs.delete(ptnPath,true);
+ }
+ }
+ throw e;
+ }
+
+ }else{
+ // no harProcessor, regular operation
+ // No duplicate partition publish case to worry about because we'll
+ // get a AlreadyExistsException here if so, and appropriately rollback
+ updateTableSchema(client, table, jobInfo.getOutputSchema());
+ LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
+ client.add_partitions(partitionsToAdd);
+ partitionsAdded = partitionsToAdd;
+ if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
+ Path src = new Path(ptnRootLocation);
+ moveTaskOutputs(fs, src, src, tblPath,false);
+ fs.delete(src, true);
+ }
+ }
+ } catch (Exception e) {
+ if( partitionsAdded.size() > 0 ) {
+ try {
+ //baseCommitter.cleanupJob failed, try to clean up the metastore
+ for (Partition p : partitionsAdded){
+ client.dropPartition(tableInfo.getDatabaseName(),
+ tableInfo.getTableName(), p.getValues());
+ }
+ } catch(Exception te) {
+ //Keep cause as the original exception
+ throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+ }
+ }
+ if( e instanceof HCatException ) {
+ throw (HCatException) e;
+ } else {
+ throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+ }
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
+ }
+ }
+
+ /**
+ * This method exists to ensure unit tests run with Pig 0.8 and
+ * 0.9 versions. The cleanupJob method is deprecated but, Pig 0.8 and
+ * 0.9 call cleanupJob method. Hence this method is used by both abortJob
+ * and cleanupJob methods.
+ * @param JobContext The job context.
+ * @throws java.io.IOException
+ */
+ private void internalAbortJob(JobContext context, State state) throws IOException{
+ try {
+ if (dynamicPartitioningUsed) {
+ discoverPartitions(context);
+ }
+ org.apache.hadoop.mapred.JobContext mapRedJobContext = HCatMapRedUtil
+ .createJobContext(context);
+ if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+ getBaseOutputCommitter().abortJob(mapRedJobContext, state);
+ } else if (dynamicPartitioningUsed) {
+ for (JobContext currContext : contextDiscoveredByPath.values()) {
+ try {
+ new JobConf(currContext.getConfiguration())
+ .getOutputCommitter().abortJob(currContext,
+ state);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ Path src;
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+ if (dynamicPartitioningUsed) {
+ src = new Path(getPartitionRootLocation(jobInfo.getLocation()
+ .toString(), jobInfo.getTableInfo().getTable()
+ .getPartitionKeysSize()));
+ } else {
+ src = new Path(jobInfo.getLocation());
+ }
+ FileSystem fs = src.getFileSystem(context.getConfiguration());
+ LOG.info("Job failed. Cleaning up temporary directory [{}].", src);
+ fs.delete(src, true);
+ } finally {
+ cancelDelegationTokens(context);
+ }
+ }
+
+ private void cancelDelegationTokens(JobContext context) throws IOException{
+ LOG.info("Cancelling deletgation token for the job.");
+ HiveMetaStoreClient client = null;
+ try {
+ HiveConf hiveConf = HCatUtil
+ .getHiveConf(context.getConfiguration());
+ client = HCatUtil.createHiveClient(hiveConf);
+ // cancel the deleg. tokens that were acquired for this job now that
+ // we are done - we should cancel if the tokens were acquired by
+ // HCatOutputFormat and not if they were supplied by Oozie.
+ // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in
+ // the conf will not be set
+ String tokenStrForm = client.getTokenStrForm();
+ if (tokenStrForm != null
+ && context.getConfiguration().get(
+ HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+ client.cancelDelegationToken(tokenStrForm);
+ }
+ } catch (MetaException e) {
+ LOG.warn("MetaException while cancelling delegation token.",e );
+ } catch (TException e) {
+ LOG.warn("TException while cancelling delegation token.", e);
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
+ }
+ }
}
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java Fri Aug 3 23:14:22 2012
@@ -27,7 +27,6 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -103,7 +102,7 @@ public class InitializeInput {
PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(),
job.getConfiguration(),
inputJobInfo);
- partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn));
+ partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table, ptn));
partInfoList.add(partInfo);
}
@@ -124,27 +123,6 @@ public class InitializeInput {
}
- private static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn) throws IOException{
- List<String> values = ptn.getValues();
- if( values.size() != table.getPartitionKeys().size() ) {
- throw new IOException("Partition values in partition inconsistent with table definition, table "
- + table.getTableName() + " has "
- + table.getPartitionKeys().size()
- + " partition keys, partition has " + values.size() + "partition values" );
- }
-
- Map<String,String> ptnKeyValues = new HashMap<String,String>();
-
- int i = 0;
- for(FieldSchema schema : table.getPartitionKeys()) {
- // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues()
- ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
- i++;
- }
-
- return ptnKeyValues;
- }
-
static PartInfo extractPartInfo(StorageDescriptor sd,
Map<String,String> parameters, Configuration conf,
InputJobInfo inputJobInfo) throws IOException{
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java Fri Aug 3 23:14:22 2012
@@ -21,7 +21,9 @@ package org.apache.hcatalog.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -45,6 +47,7 @@ import org.apache.hcatalog.data.schema.H
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -56,7 +59,7 @@ class InternalUtil {
for (String key : properties.keySet()){
hcatProperties.put(key, properties.get(key));
}
-
+
// also populate with StorageDescriptor->SerDe.Parameters
for (Map.Entry<String, String>param :
sd.getSerdeInfo().getParameters().entrySet()) {
@@ -132,20 +135,20 @@ class InternalUtil {
//TODO this has to find a better home, it's also hardcoded as default in hive would be nice
// if the default was decided by the serde
- static void initializeOutputSerDe(SerDe serDe, Configuration conf,
- OutputJobInfo jobInfo)
+ static void initializeOutputSerDe(SerDe serDe, Configuration conf,
+ OutputJobInfo jobInfo)
throws SerDeException {
- initializeSerDe(serDe, conf, jobInfo.getTableInfo(),
- jobInfo.getOutputSchema());
+ initializeSerDe(serDe, conf, jobInfo.getTableInfo(),
+ jobInfo.getOutputSchema());
}
- static void initializeInputSerDe(SerDe serDe, Configuration conf,
+ static void initializeInputSerDe(SerDe serDe, Configuration conf,
HCatTableInfo info, HCatSchema s)
throws SerDeException {
- initializeSerDe(serDe, conf, info, s);
+ initializeSerDe(serDe, conf, info, s);
}
- static void initializeSerDe(SerDe serDe, Configuration conf,
+ static void initializeSerDe(SerDe serDe, Configuration conf,
HCatTableInfo info, HCatSchema s)
throws SerDeException {
Properties props = new Properties();
@@ -183,4 +186,25 @@ static Reporter createReporter(TaskAttem
+ " but found " + split.getClass().getName());
}
}
+
+ static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn) throws IOException{
+ List<String> values = ptn.getValues();
+ if( values.size() != table.getPartitionKeys().size() ) {
+ throw new IOException("Partition values in partition inconsistent with table definition, table "
+ + table.getTableName() + " has "
+ + table.getPartitionKeys().size()
+ + " partition keys, partition has " + values.size() + "partition values" );
+ }
+
+ Map<String,String> ptnKeyValues = new HashMap<String,String>();
+
+ int i = 0;
+ for(FieldSchema schema : table.getPartitionKeys()) {
+ // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues()
+ ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
+ i++;
+ }
+
+ return ptnKeyValues;
+ }
}
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java Fri Aug 3 23:14:22 2012
@@ -27,6 +27,7 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.Credentials;
@@ -156,7 +157,7 @@ public class HCatStorer extends HCatBase
//Calling it from here so that the partition publish happens.
//This call needs to be removed after MAPREDUCE-1447 is fixed.
getOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext(
- job.getConfiguration(), new TaskAttemptID())).cleanupJob(job);
+ job.getConfiguration(), new TaskAttemptID())).commitJob(job);
} catch (IOException e) {
throw new IOException("Failed to cleanup job",e);
} catch (InterruptedException e) {
@@ -164,4 +165,23 @@ public class HCatStorer extends HCatBase
}
}
}
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ if (job.getConfiguration().get("mapred.job.tracker", "")
+ .equalsIgnoreCase("local")) {
+ try {
+ // This call needs to be removed after MAPREDUCE-1447 is fixed.
+ getOutputFormat().getOutputCommitter(
+ HCatHadoopShims.Instance.get()
+ .createTaskAttemptContext(
+ job.getConfiguration(),
+ new TaskAttemptID())).abortJob(job, State.FAILED);
+ } catch (IOException e) {
+ throw new IOException("Failed to abort job", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Failed to abort job", e);
+ }
+ }
+ }
}
Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java Fri Aug 3 23:14:22 2012
@@ -154,7 +154,7 @@ public class TestHCatOutputFormat extend
public void publishTest(Job job) throws Exception {
OutputCommitter committer = new FileOutputCommitterContainer(job,null);
- committer.cleanupJob(job);
+ committer.commitJob(job);
Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));
assertNotNull(part);
Added: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java?rev=1369248&view=auto
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java (added)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java Fri Aug 3 23:14:22 2012
@@ -0,0 +1,236 @@
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.NoExitSecurityManager;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHCatPartitionPublish {
+ private static File workDir;
+ private static Configuration mrConf = null;
+ private static FileSystem fs = null;
+ private static MiniMRCluster mrCluster = null;
+ private static boolean isServerRunning = false;
+ private static final int msPort = 20101;
+ private static HiveConf hcatConf;
+ private static HiveMetaStoreClient msc;
+ private static SecurityManager securityManager;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ createWorkDir();
+ Configuration conf = new Configuration(true);
+ fs = FileSystem.get(conf);
+ System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath());
+ // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+ // to use MiniMRCluster. MAPREDUCE-2350
+ mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+ new JobConf(conf));
+ mrConf = mrCluster.createJobConf();
+
+ if(isServerRunning) {
+ return;
+ }
+
+ MetaStoreUtils.startMetaStore(msPort, ShimLoader.getHadoopThriftAuthBridge());
+ isServerRunning = true;
+ securityManager = System.getSecurityManager();
+ System.setSecurityManager(new NoExitSecurityManager());
+
+ hcatConf = new HiveConf(TestHCatPartitionPublish.class);
+ hcatConf.set("hive.metastore.local", "false");
+ hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort);
+ hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3);
+
+ hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName());
+ hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ msc = new HiveMetaStoreClient(hcatConf,null);
+ System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+ System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+ }
+
+ private static void createWorkDir() throws IOException {
+ String testDir = System.getProperty("test.data.dir", "./build/test");
+ testDir = testDir + "/test_hcat_test_ptn" + Math.abs(new Random().nextLong()) + "/";
+ workDir = new File(new File(testDir).getCanonicalPath());
+ FileUtil.fullyDelete(workDir);
+ workDir.mkdirs();
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+ FileUtil.fullyDelete(workDir);
+ System.setSecurityManager(securityManager);
+ isServerRunning = false;
+ }
+
+ @Test
+ public void testPartitionPublish() throws Exception {
+ String dbName = "default";
+ String tableName = "testHCatPartitionedTable";
+ createTable(null,tableName);
+
+ Map<String, String> partitionMap = new HashMap<String, String>();
+ partitionMap.put("part1", "p1value1");
+ partitionMap.put("part0", "p0value1");
+
+ ArrayList<HCatFieldSchema> hcatTableColumns = new ArrayList<HCatFieldSchema>();
+ for(FieldSchema fs : getTableColumns()){
+ hcatTableColumns.add(HCatSchemaUtils.getHCatFieldSchema(fs));
+ }
+
+ runMRCreateFail(dbName, tableName, partitionMap, hcatTableColumns);
+ List<String> ptns = msc.listPartitionNames(dbName, tableName, (short)10);
+ Assert.assertEquals(0, ptns.size());
+ Table table = msc.getTable(dbName, tableName);
+ Assert.assertTrue(table != null);
+ // Also make sure that the directory has been deleted in the table location.
+ Assert.assertFalse(fs.exists(new Path(table.getSd().getLocation() + "/part1=p1value1/part0=p0value1")));
+ }
+
+ void runMRCreateFail(String dbName, String tableName, Map<String, String> partitionValues,
+ List<HCatFieldSchema> columns) throws Exception {
+
+ Job job = new Job(mrConf, "hcat mapreduce write fail test");
+ job.setJarByClass(this.getClass());
+ job.setMapperClass(TestHCatPartitionPublish.MapFail.class);
+
+ // input/output settings
+ job.setInputFormatClass(TextInputFormat.class);
+
+ Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
+ //The write count does not matter, as the map will fail in its first call.
+ createInputFile(path, 5);
+
+ TextInputFormat.setInputPaths(job, path);
+ job.setOutputFormatClass(HCatOutputFormat.class);
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues);
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(DefaultHCatRecord.class);
+
+ job.setNumReduceTasks(0);
+
+ HCatOutputFormat.setSchema(job, new HCatSchema(columns));
+
+ boolean success = job.waitForCompletion(true);
+ Assert.assertTrue(success == false);
+ }
+
+ private void createInputFile(Path path, int rowCount) throws IOException {
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ FSDataOutputStream os = fs.create(path);
+ for (int i = 0; i < rowCount; i++) {
+ os.writeChars(i + "\n");
+ }
+ os.close();
+ }
+
+ public static class MapFail extends
+ Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+ @Override
+ public void map(LongWritable key, Text value, Context context
+ ) throws IOException, InterruptedException {
+ {
+ throw new IOException("Exception to mimic job failure.");
+ }
+ }
+ }
+
+ private void createTable(String dbName,String tableName) throws Exception{
+ String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+ try {
+ msc.dropTable(databaseName, tableName);
+ } catch(Exception e) {
+ } //can fail with NoSuchObjectException
+
+ Table tbl = new Table();
+ tbl.setDbName(databaseName);
+ tbl.setTableName(tableName);
+ tbl.setTableType("MANAGED_TABLE");
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(getTableColumns());
+ tbl.setPartitionKeys(getPartitionKeys());
+ tbl.setSd(sd);
+ sd.setBucketCols(new ArrayList<String>(2));
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().getParameters().put(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().setSerializationLib(ColumnarSerDe.class.getName());
+ sd.setInputFormat(RCFileInputFormat.class.getName());
+ sd.setOutputFormat(RCFileOutputFormat.class.getName());
+
+ Map<String, String> tableParams = new HashMap<String, String>();
+ tbl.setParameters(tableParams);
+
+ msc.createTable(tbl);
+ }
+
+ protected List<FieldSchema> getPartitionKeys() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ //Defining partition names in unsorted order
+ fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, ""));
+ fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+ protected List<FieldSchema> getTableColumns() {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+ fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+ return fields;
+ }
+
+}
Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java Fri Aug 3 23:14:22 2012
@@ -167,7 +167,7 @@ public class TestSequenceFileReadWrite {
HCatOutputFormat.setSchema(job, getSchema());
job.setNumReduceTasks(0);
assertTrue(job.waitForCompletion(true));
- new FileOutputCommitterContainer(job, null).cleanupJob(job);
+ new FileOutputCommitterContainer(job, null).commitJob(job);
assertTrue(job.isSuccessful());
server.setBatchOn();
@@ -211,7 +211,7 @@ public class TestSequenceFileReadWrite {
job.setOutputFormatClass(HCatOutputFormat.class);
HCatOutputFormat.setSchema(job, getSchema());
assertTrue(job.waitForCompletion(true));
- new FileOutputCommitterContainer(job, null).cleanupJob(job);
+ new FileOutputCommitterContainer(job, null).commitJob(job);
assertTrue(job.isSuccessful());
server.setBatchOn();
@@ -254,4 +254,4 @@ public class TestSequenceFileReadWrite {
return schema;
}
-}
\ No newline at end of file
+}
Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/pig/TestHCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/pig/TestHCatStorer.java?rev=1369248&r1=1369247&r2=1369248&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/pig/TestHCatStorer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/pig/TestHCatStorer.java Fri Aug 3 23:14:22 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hcatalog.pig;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hive.ql.Command
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hcatalog.HcatTestUtils;
+import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
@@ -37,6 +39,7 @@ import org.apache.pig.data.DataByteArray
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.LogUtils;
+import org.junit.Assert;
public class TestHCatStorer extends TestCase {
private static final String TEST_DATA_DIR = System.getProperty("user.dir") +
@@ -639,4 +642,57 @@ public class TestHCatStorer extends Test
assertEquals(0, results.size());
driver.run("drop table employee");
}
+
+ public void testPartitionPublish() throws IOException, CommandNeedRetryException{
+
+ driver.run("drop table ptn_fail");
+ String createTable = "create table ptn_fail(a int, c string) partitioned by (b string) stored as RCFILE";
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table.");
+ }
+ int LOOP_SIZE = 11;
+ String[] input = new String[LOOP_SIZE];
+
+ for(int i = 0; i < LOOP_SIZE; i++) {
+ input[i] = i + "\tmath";
+ }
+ HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerQuery("A = load '"+ INPUT_FILE_NAME+"' as (a:int, c:chararray);");
+ server.registerQuery("B = filter A by " + FailEvalFunc.class.getName() + "($0);");
+ server.registerQuery("store B into 'ptn_fail' using "+HCatStorer.class.getName()+"('b=math');");
+ server.executeBatch();
+
+ String query = "show partitions ptn_fail";
+ retCode = driver.run(query).getResponseCode();
+
+ if( retCode != 0 ) {
+ throw new IOException("Error " + retCode + " running query " + query);
+ }
+
+ ArrayList<String> res = new ArrayList<String>();
+ driver.getResults(res);
+ assertEquals(0, res.size());
+
+ //Make sure the partitions directory is not in hdfs.
+ Assert.assertTrue((new File(TEST_WAREHOUSE_DIR + "/ptn_fail")).exists());
+ Assert.assertFalse((new File(TEST_WAREHOUSE_DIR + "/ptn_fail/b=math")).exists());
+ }
+
+ static public class FailEvalFunc extends EvalFunc<Boolean> {
+
+ /* @param Tuple
+ /* @return null
+ /* @throws IOException
+ * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public Boolean exec(Tuple tuple) throws IOException {
+ throw new IOException("Eval Func to mimic Failure.");
+ }
+
+ }
+
}