You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/09/06 17:38:43 UTC

svn commit: r1622876 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/io/ java/org/apache/hadoop/hive/ql/plan/ test/org/apache/hadoop/hive/ql/exec/

Author: gates
Date: Sat Sep  6 15:38:43 2014
New Revision: 1622876

URL: http://svn.apache.org/r1622876
Log:
HIVE-7078 Need file sink operators that work with ACID (Alan Gates, reveiwed by Prasanth J)

Added:
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1622876&r1=1622875&r2=1622876&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Sat Sep  6 15:38:43 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -29,12 +30,15 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -62,6 +66,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
@@ -100,6 +105,10 @@ public class FileSinkOperator extends Te
   private transient List<Object> keyWritables;
   private transient List<String> keys;
   private transient int numKeyColToRead;
+  private StructField recIdField; // field to find record identifier in
+  private StructField bucketField; // field bucket is in in record id
+  private StructObjectInspector recIdInspector; // OI for inspecting record id
+  private IntObjectInspector bucketInspector; // OI for inspecting bucket id
 
   /**
    * RecordWriter.
@@ -117,7 +126,10 @@ public class FileSinkOperator extends Te
     Path[] outPaths;
     Path[] finalPaths;
     RecordWriter[] outWriters;
+    RecordUpdater[] updaters;
     Stat stat;
+    int acidLastBucket = -1;
+    int acidFileOffset = -1;
 
     public FSPaths() {
     }
@@ -128,6 +140,8 @@ public class FileSinkOperator extends Te
       outPaths = new Path[numFiles];
       finalPaths = new Path[numFiles];
       outWriters = new RecordWriter[numFiles];
+      updaters = new RecordUpdater[numFiles];
+      LOG.debug("Created slots for  " + numFiles);
       stat = new Stat();
     }
 
@@ -168,6 +182,15 @@ public class FileSinkOperator extends Te
           }
         }
       }
+      try {
+        for (int i = 0; i < updaters.length; i++) {
+          if (updaters[i] != null) {
+            updaters[i].close(abort);
+          }
+        }
+      } catch (IOException e) {
+        throw new HiveException(e);
+      }
     }
 
     private void commit(FileSystem fs) throws HiveException {
@@ -177,7 +200,21 @@ public class FileSinkOperator extends Te
               && !fs.exists(finalPaths[idx].getParent())) {
             fs.mkdirs(finalPaths[idx].getParent());
           }
-          if (!fs.rename(outPaths[idx], finalPaths[idx])) {
+          boolean needToRename = true;
+          if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+              conf.getWriteType() == AcidUtils.Operation.DELETE) {
+            // If we're updating or deleting there may be no file to close.  This can happen
+            // because the where clause strained out all of the records for a given bucket.  So
+            // before attempting the rename below, check if our file exists.  If it doesn't,
+            // then skip the rename.  If it does try it.  We could just blindly try the rename
+            // and avoid the extra stat, but that would mask other errors.
+            try {
+              FileStatus stat = fs.getFileStatus(outPaths[idx]);
+            } catch (FileNotFoundException fnfe) {
+              needToRename = false;
+            }
+          }
+          if (needToRename && !fs.rename(outPaths[idx], finalPaths[idx])) {
             throw new HiveException("Unable to rename output from: " +
                 outPaths[idx] + " to: " + finalPaths[idx]);
           }
@@ -350,6 +387,16 @@ public class FileSinkOperator extends Te
           valToPaths.put("", fsp); // special entry for non-DP case
         }
       }
+
+      if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+          conf.getWriteType() == AcidUtils.Operation.DELETE) {
+        // ROW__ID is always in the first field
+        recIdField = ((StructObjectInspector)outputObjInspector).getAllStructFieldRefs().get(0);
+        recIdInspector = (StructObjectInspector)recIdField.getFieldObjectInspector();
+        // bucket is the second field in the record id
+        bucketField = recIdInspector.getAllStructFieldRefs().get(1);
+        bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
+      }
       initializeChildren(hconf);
     } catch (HiveException e) {
       throw e;
@@ -420,6 +467,7 @@ public class FileSinkOperator extends Te
           assert totalFiles == 1;
         }
 
+        int bucketNum = 0;
         if (multiFileSpray) {
           key.setHashCode(idx);
 
@@ -436,7 +484,7 @@ public class FileSinkOperator extends Te
             }
           }
 
-          int bucketNum = prtner.getBucket(key, null, totalFiles);
+          bucketNum = prtner.getBucket(key, null, totalFiles);
           if (seenBuckets.contains(bucketNum)) {
             continue;
           }
@@ -462,7 +510,8 @@ public class FileSinkOperator extends Te
     filesCreated = true;
   }
 
-  protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException {
+  protected void createBucketForFileIdx(FSPaths fsp, int filesIdx)
+      throws HiveException {
     try {
       if (isNativeTable) {
         fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null);
@@ -493,11 +542,21 @@ public class FileSinkOperator extends Te
       Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc);
       // only create bucket files only if no dynamic partitions,
       // buckets of dynamic partitions will be created for each newly created partition
-      fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
-          outputClass, conf, fsp.outPaths[filesIdx], reporter);
-      // If the record writer provides stats, get it from there instead of the serde
-      statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter;
-      // increment the CREATED_FILES counter
+      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+        fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
+            outputClass, conf, fsp.outPaths[filesIdx], reporter);
+        // If the record writer provides stats, get it from there instead of the serde
+        statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof
+            StatsProvidingRecordWriter;
+        // increment the CREATED_FILES counter
+      } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
+        // Only set up the updater for insert.  For update and delete we don't know unitl we see
+        // the row.
+        ObjectInspector inspector = bDynParts ? subSetOI : outputObjInspector;
+        int acidBucketNum = Integer.valueOf(Utilities.getTaskIdFromFilename(taskId));
+        fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(),
+            acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1);
+      }
       if (reporter != null) {
         reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP),
             Operator.HIVECOUNTERCREATEDFILES, 1);
@@ -598,27 +657,47 @@ public class FileSinkOperator extends Te
       }
 
 
-      RecordWriter rowOutWriter = null;
-
       if (row_count != null) {
         row_count.set(row_count.get() + 1);
       }
 
-      if (!multiFileSpray) {
-        rowOutWriter = rowOutWriters[0];
+      int writerOffset = findWriterOffset(row);
+      // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same
+      // for a given operator branch prediction should work quite nicely on it.
+      // RecordUpdateer expects to get the actual row, not a serialized version of it.  Thus we
+      // pass the row rather than recordValue.
+      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+        rowOutWriters[writerOffset].write(recordValue);
+      } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
+        fpaths.updaters[writerOffset].insert(conf.getTransactionId(), row);
       } else {
-        int keyHashCode = 0;
-        for (int i = 0; i < partitionEval.length; i++) {
-          Object o = partitionEval[i].evaluate(row);
-          keyHashCode = keyHashCode * 31
-              + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
-        }
-        key.setHashCode(keyHashCode);
-        int bucketNum = prtner.getBucket(key, null, totalFiles);
-        int idx = bucketMap.get(bucketNum);
-        rowOutWriter = rowOutWriters[idx];
+        // TODO I suspect we could skip much of the stuff above this in the function in the case
+        // of update and delete.  But I don't understand all of the side effects of the above
+        // code and don't want to skip over it yet.
+
+        // Find the bucket id, and switch buckets if need to
+        ObjectInspector rowInspector = bDynParts ? subSetOI : outputObjInspector;
+        Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField);
+        int bucketNum =
+            bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField));
+        if (fpaths.acidLastBucket != bucketNum) {
+          fpaths.acidLastBucket = bucketNum;
+          // Switch files
+          fpaths.updaters[++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
+              jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[fpaths.acidFileOffset],
+              rowInspector, reporter, 0);
+          LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
+              fpaths.outPaths[fpaths.acidFileOffset]);
+        }
+
+        if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
+          fpaths.updaters[fpaths.acidFileOffset].update(conf.getTransactionId(), row);
+        } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) {
+          fpaths.updaters[fpaths.acidFileOffset].delete(conf.getTransactionId(), row);
+        } else {
+          throw new HiveException("Unknown write type " + conf.getWriteType().toString());
+        }
       }
-      rowOutWriter.write(recordValue);
     } catch (IOException e) {
       throw new HiveException(e);
     } catch (SerDeException e) {
@@ -627,6 +706,11 @@ public class FileSinkOperator extends Te
   }
 
   protected boolean areAllTrue(boolean[] statsFromRW) {
+    // If we are doing an acid operation they will always all be true as RecordUpdaters always
+    // collect stats
+    if (conf.getWriteType() != AcidUtils.Operation.NOT_ACID) {
+      return true;
+    }
     for(boolean b : statsFromRW) {
       if (!b) {
         return false;
@@ -635,6 +719,23 @@ public class FileSinkOperator extends Te
     return true;
   }
 
+  private int findWriterOffset(Object row) throws HiveException {
+    if (!multiFileSpray) {
+      return 0;
+    } else {
+      int keyHashCode = 0;
+      for (int i = 0; i < partitionEval.length; i++) {
+        Object o = partitionEval[i].evaluate(row);
+        keyHashCode = keyHashCode * 31
+            + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+      }
+      key.setHashCode(keyHashCode);
+      int bucketNum = prtner.getBucket(key, null, totalFiles);
+      return bucketMap.get(bucketNum);
+    }
+
+  }
+
   /**
    * Lookup list bucketing path.
    * @param lbDirName
@@ -727,14 +828,16 @@ public class FileSinkOperator extends Te
     FSPaths fp;
 
     // get the path corresponding to the dynamic partition columns,
-    String dpDir = getDynPartDirectory(row, dpColNames, numDynParts);
+    String dpDir = getDynPartDirectory(row, dpColNames);
 
     String pathKey = null;
     if (dpDir != null) {
       dpDir = appendToSource(lbDirName, dpDir);
       pathKey = dpDir;
+      int numericBucketNum = 0;
       if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
         String buckNum = row.get(row.size() - 1);
+        numericBucketNum = Integer.valueOf(buckNum);
         taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), buckNum);
         pathKey = appendToSource(taskId, dpDir);
       }
@@ -756,13 +859,18 @@ public class FileSinkOperator extends Te
           // since we are closing the previous fsp's record writers, we need to see if we can get
           // stats from the record writer and store in the previous fsp that is cached
           if (conf.isGatherStats() && isCollectRWStats) {
-            RecordWriter outWriter = prevFsp.outWriters[0];
-            if (outWriter != null) {
-              SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
-              if (stats != null) {
+            SerDeStats stats = null;
+            if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+              RecordWriter outWriter = prevFsp.outWriters[0];
+              if (outWriter != null) {
+                stats = ((StatsProvidingRecordWriter) outWriter).getStats();
+              }
+            } else if (prevFsp.updaters[0] != null) {
+              stats = prevFsp.updaters[0].getStats();
+            }
+            if (stats != null) {
                 prevFsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
                 prevFsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
-              }
             }
           }
 
@@ -805,8 +913,7 @@ public class FileSinkOperator extends Te
   // given the current input row, the mapping for input col info to dp columns, and # of dp cols,
   // return the relative path corresponding to the row.
   // e.g., ds=2008-04-08/hr=11
-  private String getDynPartDirectory(List<String> row, List<String> dpColNames, int numDynParts) {
-    assert row.size() == numDynParts && numDynParts == dpColNames.size() : "data length is different from num of DP columns";
+  private String getDynPartDirectory(List<String> row, List<String> dpColNames) {
     return FileUtils.makePartName(dpColNames, row);
   }
 
@@ -832,6 +939,7 @@ public class FileSinkOperator extends Te
 
   @Override
   public void closeOp(boolean abort) throws HiveException {
+
     if (!bDynParts && !filesCreated) {
       createBucketFiles(fsp);
     }
@@ -849,13 +957,25 @@ public class FileSinkOperator extends Te
         // record writer already gathers the statistics, it can simply return the
         // accumulated statistics which will be aggregated in case of spray writers
         if (conf.isGatherStats() && isCollectRWStats) {
-          for (int idx = 0; idx < fsp.outWriters.length; idx++) {
-            RecordWriter outWriter = fsp.outWriters[idx];
-            if (outWriter != null) {
-              SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
-              if (stats != null) {
-                fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
-                fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+          if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+            for (int idx = 0; idx < fsp.outWriters.length; idx++) {
+              RecordWriter outWriter = fsp.outWriters[idx];
+              if (outWriter != null) {
+                SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
+                if (stats != null) {
+                  fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+                  fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+                }
+              }
+            }
+          } else {
+            for (int i = 0; i < fsp.updaters.length; i++) {
+              if (fsp.updaters[i] != null) {
+                SerDeStats stats = fsp.updaters[i].getStats();
+                if (stats != null) {
+                  fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+                  fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+                }
               }
             }
           }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1622876&r1=1622875&r2=1622876&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Sat Sep  6 15:38:43 2014
@@ -164,6 +164,8 @@ public class AcidUtils {
     return result;
   }
 
+  public enum Operation { NOT_ACID, INSERT, UPDATE, DELETE }
+
   public static interface Directory {
 
     /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1622876&r1=1622875&r2=1622876&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Sat Sep  6 15:38:43 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -249,21 +250,8 @@ public final class HiveFileFormatUtils {
   public static RecordWriter getHiveRecordWriter(JobConf jc,
       TableDesc tableInfo, Class<? extends Writable> outputClass,
       FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
-    boolean storagehandlerofhivepassthru = false;
-    HiveOutputFormat<?, ?> hiveOutputFormat;
+    HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo);
     try {
-      if (tableInfo.getJobProperties() != null) {
-        if (tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) {
-            jc.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY));
-            storagehandlerofhivepassthru  = true;
-         }
-      }
-      if (storagehandlerofhivepassthru) {
-         hiveOutputFormat = ReflectionUtils.newInstance(tableInfo.getOutputFileFormatClass(),jc);
-      }
-      else {
-         hiveOutputFormat = tableInfo.getOutputFileFormatClass().newInstance();
-      }
       boolean isCompressed = conf.getCompressed();
       JobConf jc_output = jc;
       if (isCompressed) {
@@ -299,6 +287,73 @@ public final class HiveFileFormatUtils {
     return null;
   }
 
+  private static HiveOutputFormat<?, ?> getHiveOutputFormat(JobConf jc, TableDesc tableInfo)
+      throws HiveException {
+    boolean storagehandlerofhivepassthru = false;
+    HiveOutputFormat<?, ?> hiveOutputFormat;
+    try {
+      if (tableInfo.getJobProperties() != null) {
+        if (tableInfo.getJobProperties().get(
+            HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) {
+          jc.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,
+              tableInfo.getJobProperties()
+                  .get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY));
+          storagehandlerofhivepassthru = true;
+        }
+      }
+      if (storagehandlerofhivepassthru) {
+        return ReflectionUtils.newInstance(tableInfo.getOutputFileFormatClass(), jc);
+      } else {
+        return tableInfo.getOutputFileFormatClass().newInstance();
+      }
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo, int bucket,
+                                                   FileSinkDesc conf, Path outPath,
+                                                   ObjectInspector inspector,
+                                                   Reporter reporter, int rowIdColNum)
+      throws HiveException, IOException {
+    HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo);
+    AcidOutputFormat<?, ?> acidOutputFormat = null;
+    if (hiveOutputFormat instanceof AcidOutputFormat) {
+      acidOutputFormat = (AcidOutputFormat)hiveOutputFormat;
+    } else {
+      throw new HiveException("Unable to create RecordUpdater for HiveOutputFormat that does not " +
+          "implement AcidOutputFormat");
+    }
+    // TODO not 100% sure about this.  This call doesn't set the compression type in the conf
+    // file the way getHiveRecordWriter does, as ORC appears to read the value for itself.  Not
+    // sure if this is correct or not.
+    return getRecordUpdater(jc, acidOutputFormat, conf.getCompressed(), conf.getTransactionId(),
+        bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum);
+  }
+
+
+  private static RecordUpdater getRecordUpdater(JobConf jc,
+                                                AcidOutputFormat<?, ?> acidOutputFormat,
+                                                boolean isCompressed,
+                                                long txnId,
+                                                int bucket,
+                                                ObjectInspector inspector,
+                                                Properties tableProp,
+                                                Path outPath,
+                                                Reporter reporter,
+                                                int rowIdColNum) throws IOException {
+    return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc)
+        .isCompressed(isCompressed)
+        .tableProperties(tableProp)
+        .reporter(reporter)
+        .writingBase(false)
+        .minimumTransactionId(txnId)
+        .maximumTransactionId(txnId)
+        .bucket(bucket)
+        .inspector(inspector)
+        .recordIdColumn(rowIdColNum));
+  }
+
   public static PartitionDesc getPartitionDescFromPathRecursively(
       Map<String, PartitionDesc> pathToPartitionInfo, Path dir,
       Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cacheMap)

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=1622876&r1=1622875&r2=1622876&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Sat Sep  6 15:38:43 2014
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 
 /**
  * FileSinkDesc.
@@ -84,6 +85,10 @@ public class FileSinkDesc extends Abstra
 
   private boolean statsCollectRawDataSize;
 
+  // Record what type of write this is.  Default is non-ACID (ie old style).
+  private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
+  private long txnId = 0;  // transaction id for this operation
+
   public FileSinkDesc() {
   }
 
@@ -137,6 +142,8 @@ public class FileSinkDesc extends Abstra
     ret.setMaxStatsKeyPrefixLength(maxStatsKeyPrefixLength);
     ret.setStatsCollectRawDataSize(statsCollectRawDataSize);
     ret.setDpSortState(dpSortState);
+    ret.setWriteType(writeType);
+    ret.setTransactionId(txnId);
     return (Object) ret;
   }
 
@@ -398,4 +405,20 @@ public class FileSinkDesc extends Abstra
   public void setDpSortState(DPSortState dpSortState) {
     this.dpSortState = dpSortState;
   }
+
+  public void setWriteType(AcidUtils.Operation type) {
+    writeType = type;
+  }
+
+  public AcidUtils.Operation getWriteType() {
+    return writeType;
+  }
+
+  public void setTransactionId(long id) {
+    txnId = id;
+  }
+
+  public long getTransactionId() {
+    return txnId;
+  }
 }

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java?rev=1622876&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java Sat Sep  6 15:38:43 2014
@@ -0,0 +1,757 @@
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.stats.StatsAggregator;
+import org.apache.hadoop.hive.ql.stats.StatsPublisher;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Tests for {@link org.apache.hadoop.hive.ql.exec.FileSinkOperator}
+ */
+public class TestFileSinkOperator {
+  private static String PARTCOL_NAME = "partval";
+  static final private Log LOG = LogFactory.getLog(TestFileSinkOperator.class.getName());
+
+  private static File tmpdir;
+  private static TableDesc nonAcidTableDescriptor;
+  private static TableDesc acidTableDescriptor;
+  private static ObjectInspector inspector;
+  private static List<TFSORow> rows;
+  private static ValidTxnList txnList;
+
+  private Path basePath;
+  private JobConf jc;
+
+  @BeforeClass
+  public static void classSetup() {
+    Properties properties = new Properties();
+    properties.setProperty(serdeConstants.SERIALIZATION_LIB, TFSOSerDe.class.getName());
+    nonAcidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties);
+    properties = new Properties(properties);
+    properties.setProperty(hive_metastoreConstants.BUCKET_COUNT, "1");
+    acidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties);
+
+    tmpdir = new File(System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") +
+        "testFileSinkOperator");
+    tmpdir.mkdir();
+    tmpdir.deleteOnExit();
+    txnList = new ValidTxnListImpl(new long[]{}, 2);
+  }
+
+  @Test
+  public void testNonAcidWrite() throws Exception {
+    setBasePath("write");
+    setupData(DataFormat.SIMPLE);
+    FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, false, 0);
+    processRows(op);
+    confirmOutput();
+  }
+
+  @Test
+  public void testInsert() throws Exception {
+    setBasePath("insert");
+    setupData(DataFormat.SIMPLE);
+    FileSinkOperator op = getFileSink(AcidUtils.Operation.INSERT, false, 1);
+    processRows(op);
+    Assert.assertEquals("10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
+    confirmOutput();
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    setBasePath("update");
+    setupData(DataFormat.WITH_RECORD_ID);
+    FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, false, 2);
+    processRows(op);
+    Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
+    confirmOutput();
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    setBasePath("delete");
+    setupData(DataFormat.WITH_RECORD_ID);
+    FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, false, 2);
+    processRows(op);
+    Assert.assertEquals("-10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
+    confirmOutput();
+  }
+
+  @Test
+  public void testNonAcidDynamicPartitioning() throws Exception {
+    setBasePath("writeDP");
+    setupData(DataFormat.WITH_PARTITION_VALUE);
+    FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, true, 0);
+    processRows(op);
+    confirmOutput();
+  }
+
+
+  @Test
+  public void testInsertDynamicPartitioning() throws Exception {
+    setBasePath("insertDP");
+    setupData(DataFormat.WITH_PARTITION_VALUE);
+    FileSinkOperator op = getFileSink(AcidUtils.Operation.INSERT, true, 1);
+    processRows(op);
+    // We only expect 5 here because we'll get whichever of the partitions published its stats
+    // last.
+    Assert.assertEquals("5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
+    confirmOutput();
+  }
+
+  @Test
+  public void testUpdateDynamicPartitioning() throws Exception {
+    setBasePath("updateDP");
+    setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE);
+    FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, true, 2);
+    processRows(op);
+    Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
+    confirmOutput();
+  }
+
+  @Test
+  public void testDeleteDynamicPartitioning() throws Exception {
+    setBasePath("deleteDP");
+    setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE);
+    FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, true, 2);
+    processRows(op);
+    // We only expect -5 here because we'll get whichever of the partitions published its stats
+    // last.
+    Assert.assertEquals("-5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
+    confirmOutput();
+  }
+
+
+  @Before
+  public void setup() throws Exception {
+    jc = new JobConf();
+    jc.set(StatsSetupConst.STATS_TMP_LOC, File.createTempFile("TestFileSinkOperator",
+        "stats").getPath());
+    jc.set(HiveConf.ConfVars.HIVE_STATS_DEFAULT_PUBLISHER.varname,
+        TFSOStatsPublisher.class.getName());
+    jc.set(HiveConf.ConfVars.HIVE_STATS_DEFAULT_AGGREGATOR.varname,
+        TFSOStatsAggregator.class.getName());
+    jc.set(HiveConf.ConfVars.HIVESTATSDBCLASS.varname, "custom");
+  }
+
+  private void setBasePath(String testName) {
+    basePath = new Path(new File(tmpdir, testName).getPath());
+
+  }
+
+  private enum DataFormat {SIMPLE, WITH_RECORD_ID, WITH_PARTITION_VALUE,
+    WITH_RECORD_ID_AND_PARTITION_VALUE};
+
+  private void setupData(DataFormat format) {
+
+    // Build object inspector
+    inspector = ObjectInspectorFactory.getReflectionObjectInspector
+        (TFSORow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    rows = new ArrayList<TFSORow>();
+
+    switch (format) {
+      case SIMPLE:
+        // Build rows
+        for (int i = 0; i < 10; i++) {
+          rows.add(
+              new TFSORow(
+                  new Text("mary had a little lamb")
+              )
+          );
+        }
+        break;
+
+      case WITH_RECORD_ID:
+        for (int i = 0; i < 10; i++) {
+          rows.add(
+              new TFSORow(
+                  new Text("its fleect was white as snow"),
+                  new RecordIdentifier(1, 1, i)
+              )
+          );
+        }
+        break;
+
+      case WITH_PARTITION_VALUE:
+        for (int i = 0; i < 10; i++) {
+          rows.add(
+              new TFSORow(
+                  new Text("its fleect was white as snow"),
+                  (i < 5) ? new Text("Monday") : new Text("Tuesday")
+              )
+          );
+        }
+        break;
+
+      case WITH_RECORD_ID_AND_PARTITION_VALUE:
+        for (int i = 0; i < 10; i++) {
+          rows.add(
+              new TFSORow(
+                  new Text("its fleect was white as snow"),
+                  (i < 5) ? new Text("Monday") : new Text("Tuesday"),
+                  new RecordIdentifier(1, 1, i)
+              )
+          );
+        }
+        break;
+
+      default:
+        throw new RuntimeException("Unknown option!");
+    }
+  }
+
+  private FileSinkOperator getFileSink(AcidUtils.Operation writeType,
+                                       boolean dynamic,
+                                       long txnId) throws IOException, HiveException {
+    TableDesc tableDesc = null;
+    switch (writeType) {
+      case DELETE:
+      case UPDATE:
+      case INSERT:
+        tableDesc = acidTableDescriptor;
+        break;
+
+      case NOT_ACID:
+        tableDesc = nonAcidTableDescriptor;
+        break;
+    }
+    FileSinkDesc desc = null;
+    if (dynamic) {
+      ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>(1);
+      partCols.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, PARTCOL_NAME, "a", true));
+      Map<String, String> partColMap= new LinkedHashMap<String, String>(1);
+      partColMap.put(PARTCOL_NAME, null);
+      DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100);
+      Map<String, String> partColNames = new HashMap<String, String>(1);
+      partColNames.put(PARTCOL_NAME, PARTCOL_NAME);
+      dpCtx.setInputToDPCols(partColNames);
+      desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx);
+    } else {
+      desc = new FileSinkDesc(basePath, tableDesc, false);
+    }
+    desc.setWriteType(writeType);
+    desc.setGatherStats(true);
+    if (txnId > 0) desc.setTransactionId(txnId);
+    if (writeType != AcidUtils.Operation.NOT_ACID) desc.setTransactionId(1L);
+
+    FileSinkOperator op = (FileSinkOperator)OperatorFactory.get(FileSinkDesc.class);
+    op.setConf(desc);
+    op.initialize(jc, new ObjectInspector[]{inspector});
+    return op;
+  }
+
+  private void processRows(FileSinkOperator op) throws HiveException {
+    for (TFSORow r : rows) op.processOp(r, 0);
+    op.jobCloseOp(jc, true);
+    op.close(false);
+  }
+
+  private void confirmOutput() throws IOException, SerDeException {
+    Path[] paths = findFilesInBasePath();
+    TFSOInputFormat input = new TFSOInputFormat();
+    FileInputFormat.setInputPaths(jc, paths);
+
+    InputSplit[] splits = input.getSplits(jc, 1);
+    RecordReader<NullWritable, TFSORow> reader = input.getRecordReader(splits[0], jc,
+        Mockito.mock(Reporter.class));
+    NullWritable key = reader.createKey();
+    TFSORow value = reader.createValue();
+    List<TFSORow> results = new ArrayList<TFSORow>(rows.size());
+    List<TFSORow> sortedRows = new ArrayList<TFSORow>(rows.size());
+    for (int i = 0; i < rows.size(); i++) {
+      Assert.assertTrue(reader.next(key, value));
+      results.add(new TFSORow(value));
+      sortedRows.add(new TFSORow(rows.get(i)));
+    }
+    Assert.assertFalse(reader.next(key, value));
+    Collections.sort(results);
+    Collections.sort(sortedRows);
+    for (int i = 0; i < rows.size(); i++) {
+      Assert.assertTrue(sortedRows.get(i).equals(results.get(i)));
+    }
+
+  }
+
+  private Path[] findFilesInBasePath() throws IOException {
+    Path parent = basePath.getParent();
+    String last = basePath.getName();
+    Path tmpPath = new Path(parent, "_tmp." + last);
+    FileSystem fs = basePath.getFileSystem(jc);
+    List<Path> paths = new ArrayList<Path>();
+    recurseOnPath(tmpPath, fs, paths);
+    return paths.toArray(new Path[paths.size()]);
+  }
+
+  private void recurseOnPath(Path p, FileSystem fs, List<Path> paths) throws IOException {
+    if (fs.getFileStatus(p).isDir()) {
+      FileStatus[] stats = fs.listStatus(p);
+      for (FileStatus stat : stats) recurseOnPath(stat.getPath(), fs, paths);
+    } else {
+      paths.add(p);
+    }
+  }
+
+  private static class TFSORow implements WritableComparable<TFSORow> {
+    private RecordIdentifier recId;
+    private Text data;
+    private Text partVal;
+
+    TFSORow() {
+      this(null, null, null);
+    }
+
+    TFSORow(Text t) {
+      this(t, null, null);
+    }
+
+    TFSORow(Text t, Text pv) {
+      this(t, pv, null);
+    }
+
+    TFSORow(Text t, RecordIdentifier ri) {
+      this(t, null, ri);
+    }
+
+    TFSORow(Text t, Text pv, RecordIdentifier ri) {
+      data = t;
+      partVal = pv;
+      recId = ri;
+
+    }
+
+    TFSORow(TFSORow other) {
+      this(other.data, other.partVal, other.recId);
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+      data.write(dataOutput);
+      if (partVal == null) {
+        dataOutput.writeBoolean(false);
+      } else {
+        dataOutput.writeBoolean(true);
+        partVal.write(dataOutput);
+      }
+      if (recId == null) {
+        dataOutput.writeBoolean(false);
+      } else {
+        dataOutput.writeBoolean(true);
+        recId.write(dataOutput);
+      }
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+      data = new Text();
+      data.readFields(dataInput);
+      boolean notNull = dataInput.readBoolean();
+      if (notNull) {
+        partVal = new Text();
+        partVal.readFields(dataInput);
+      }
+      notNull = dataInput.readBoolean();
+      if (notNull) {
+        recId = new RecordIdentifier();
+        recId.readFields(dataInput);
+      }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof TFSORow) {
+        TFSORow other = (TFSORow) obj;
+        if (data == null && other.data == null) return checkPartVal(other);
+        else if (data == null) return false;
+        else if (data.equals(other.data)) return checkPartVal(other);
+        else return false;
+      } else {
+        return false;
+      }
+    }
+
+    private boolean checkPartVal(TFSORow other) {
+      if (partVal == null && other.partVal == null) return checkRecId(other);
+      else if (partVal == null) return false;
+      else if (partVal.equals(other.partVal)) return checkRecId(other);
+      else return false;
+    }
+
+    private boolean checkRecId(TFSORow other) {
+      if (recId == null && other.recId == null) return true;
+      else if (recId == null) return false;
+      else return recId.equals(other.recId);
+    }
+
+    @Override
+    public int compareTo(TFSORow other) {
+      if (recId == null && other.recId == null) {
+        return comparePartVal(other);
+      } else if (recId == null) {
+        return -1;
+      } else {
+        int rc = recId.compareTo(other.recId);
+        if (rc == 0) return comparePartVal(other);
+        else return rc;
+      }
+    }
+
+    private int comparePartVal(TFSORow other) {
+      if (partVal == null && other.partVal == null) {
+        return compareData(other);
+      } else if (partVal == null) {
+        return -1;
+      } else {
+        int rc = partVal.compareTo(other.partVal);
+        if (rc == 0) return compareData(other);
+        else return rc;
+      }
+    }
+
+    private int compareData(TFSORow other) {
+      if (data == null && other.data == null) return 0;
+      else if (data == null) return -1;
+      else return data.compareTo(other.data);
+    }
+  }
+
+  private static class TFSOInputFormat extends FileInputFormat<NullWritable, TFSORow>
+                                       implements AcidInputFormat<NullWritable, TFSORow> {
+
+    FSDataInputStream in[] = null;
+    int readingFrom = -1;
+
+    @Override
+    public RecordReader<NullWritable, TFSORow> getRecordReader(
+        InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {
+      if (in == null) {
+        Path paths[] = FileInputFormat.getInputPaths(entries);
+        in = new FSDataInputStream[paths.length];
+        FileSystem fs = paths[0].getFileSystem(entries);
+        for (int i = 0; i < paths.length; i++) {
+          in[i] = fs.open(paths[i]);
+        }
+        readingFrom = 0;
+      }
+      return new RecordReader<NullWritable, TFSORow>() {
+
+        @Override
+        public boolean next(NullWritable nullWritable, TFSORow tfsoRecord) throws
+            IOException {
+          try {
+            tfsoRecord.readFields(in[readingFrom]);
+            return true;
+          } catch (EOFException e) {
+            in[readingFrom].close();
+            if (++readingFrom >= in.length) return false;
+            else return next(nullWritable, tfsoRecord);
+          }
+        }
+
+        @Override
+        public NullWritable createKey() {
+          return NullWritable.get();
+        }
+
+        @Override
+        public TFSORow createValue() {
+          return new TFSORow();
+        }
+
+        @Override
+        public long getPos() throws IOException {
+          return 0L;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+
+        @Override
+        public float getProgress() throws IOException {
+          return 0.0f;
+        }
+      };
+    }
+
+    @Override
+    public RowReader<TFSORow> getReader(InputSplit split,
+                                           Options options) throws
+        IOException {
+      return null;
+    }
+
+    @Override
+    public RawReader<TFSORow> getRawReader(Configuration conf,
+                                              boolean collapseEvents,
+                                              int bucket,
+                                              ValidTxnList validTxnList,
+                                              Path baseDirectory,
+                                              Path[] deltaDirectory) throws
+        IOException {
+      return null;
+    }
+
+    @Override
+    public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws
+        IOException {
+      return false;
+    }
+  }
+
+  public static class TFSOOutputFormat extends FileOutputFormat<NullWritable, TFSORow>
+      implements AcidOutputFormat<NullWritable, TFSORow> {
+    List<TFSORow> records = new ArrayList<TFSORow>();
+    long numRecordsAdded = 0;
+    FSDataOutputStream out = null;
+
+    @Override
+    public RecordUpdater getRecordUpdater(final Path path, final Options options) throws
+        IOException {
+
+      final StructObjectInspector inspector = (StructObjectInspector)options.getInspector();
+      return new RecordUpdater() {
+        @Override
+        public void insert(long currentTransaction, Object row) throws IOException {
+          addRow(row);
+          numRecordsAdded++;
+        }
+
+        @Override
+        public void update(long currentTransaction, Object row) throws IOException {
+          addRow(row);
+        }
+
+        @Override
+        public void delete(long currentTransaction, Object row) throws IOException {
+          addRow(row);
+          numRecordsAdded--;
+        }
+
+        private void addRow(Object row) {
+          assert row instanceof TFSORow : "Expected TFSORow but got " +
+              row.getClass().getName();
+          records.add((TFSORow)row);
+        }
+
+        @Override
+        public void flush() throws IOException {
+          if (out == null) {
+            FileSystem fs = path.getFileSystem(options.getConfiguration());
+            out = fs.create(path);
+          }
+          for (TFSORow r : records) r.write(out);
+          records.clear();
+          out.flush();
+        }
+
+        @Override
+        public void close(boolean abort) throws IOException {
+          flush();
+          out.close();
+        }
+
+        @Override
+        public SerDeStats getStats() {
+          SerDeStats stats = new SerDeStats();
+          stats.setRowCount(numRecordsAdded);
+          return stats;
+        }
+      };
+    }
+
+    @Override
+    public FileSinkOperator.RecordWriter getRawRecordWriter(Path path,
+                                                            Options options) throws
+        IOException {
+      return null;
+    }
+
+    @Override
+    public FileSinkOperator.RecordWriter getHiveRecordWriter(final JobConf jc,
+                                                             final Path finalOutPath,
+                                                             Class<? extends Writable> valueClass,
+                                                             boolean isCompressed,
+                                                             Properties tableProperties,
+                                                             Progressable progress)
+        throws IOException {
+      return new FileSinkOperator.RecordWriter() {
+        @Override
+        public void write(Writable w) throws IOException {
+          Assert.assertTrue(w instanceof TFSORow);
+          records.add((TFSORow) w);
+        }
+
+        @Override
+        public void close(boolean abort) throws IOException {
+          if (out == null) {
+            FileSystem fs = finalOutPath.getFileSystem(jc);
+            out = fs.create(finalOutPath);
+          }
+          for (TFSORow r : records) r.write(out);
+          records.clear();
+          out.flush();
+          out.close();
+        }
+      };
+    }
+
+    @Override
+    public RecordWriter<NullWritable, TFSORow> getRecordWriter(
+        FileSystem fileSystem, JobConf entries, String s, Progressable progressable) throws
+        IOException {
+      return null;
+    }
+
+    @Override
+    public void checkOutputSpecs(FileSystem fileSystem, JobConf entries) throws IOException {
+
+    }
+  }
+
+  public static class TFSOSerDe implements SerDe {
+
+    @Override
+    public void initialize(Configuration conf, Properties tbl) throws SerDeException {
+
+    }
+
+    @Override
+    public Class<? extends Writable> getSerializedClass() {
+      return TFSORow.class;
+    }
+
+    @Override
+    public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+      assert obj instanceof TFSORow : "Expected TFSORow or decendent, got "
+          + obj.getClass().getName();
+      return (TFSORow)obj;
+    }
+
+    @Override
+    public Object deserialize(Writable blob) throws SerDeException {
+      assert blob instanceof TFSORow : "Expected TFSORow or decendent, got "
+          + blob.getClass().getName();
+      return blob;
+    }
+
+    @Override
+    public ObjectInspector getObjectInspector() throws SerDeException {
+      return null;
+    }
+
+    @Override
+    public SerDeStats getSerDeStats() {
+      return null;
+    }
+  }
+
+  public static class TFSOStatsPublisher implements StatsPublisher {
+    static Map<String, String> stats;
+
+    @Override
+    public boolean init(Configuration hconf) {
+      return true;
+    }
+
+    @Override
+    public boolean connect(Configuration hconf) {
+      return true;
+    }
+
+    @Override
+    public boolean publishStat(String fileID, Map<String, String> stats) {
+      this.stats = stats;
+      return true;
+    }
+
+    @Override
+    public boolean closeConnection() {
+      return true;
+    }
+  }
+
+  public static class TFSOStatsAggregator implements StatsAggregator {
+
+    @Override
+    public boolean connect(Configuration hconf, Task sourceTask) {
+      return true;
+    }
+
+    @Override
+    public String aggregateStats(String keyPrefix, String statType) {
+      return null;
+    }
+
+    @Override
+    public boolean closeConnection() {
+      return true;
+    }
+
+    @Override
+    public boolean cleanUp(String keyPrefix) {
+      return true;
+    }
+  }
+}