You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2015/10/01 20:42:19 UTC

hive git commit: HIVE-11972 : [Refactor] Improve determination of dynamic partitioning columns in FileSink Operator (Ashutosh Chauhan via Prasanth J)

Repository: hive
Updated Branches:
  refs/heads/master 522bb600b -> 24988f77f


HIVE-11972 : [Refactor] Improve determination of dynamic partitioning columns in FileSink Operator (Ashutosh Chauhan via Prasanth J)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/24988f77
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/24988f77
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/24988f77

Branch: refs/heads/master
Commit: 24988f77f2898bbcd91f5665b865bcc251e3cade
Parents: 522bb60
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Sat Sep 26 12:19:00 2015 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu Oct 1 11:41:53 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  19 +-
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  17 +
 .../optimizer/ConstantPropagateProcFactory.java |  11 +-
 .../hive/ql/optimizer/GenMapRedUtils.java       |  10 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  30 +-
 .../hive/ql/plan/DynamicPartitionCtx.java       |  27 --
 .../hive/ql/exec/TestFileSinkOperator.java      | 384 ++++++++++++-------
 7 files changed, 284 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 2604d5d..39944a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -493,24 +493,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       assert inputObjInspectors.length == 1 : "FileSinkOperator should have 1 parent, but it has "
           + inputObjInspectors.length;
       StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0];
-      // remove the last dpMapping.size() columns from the OI
-      List<? extends StructField> fieldOI = soi.getAllStructFieldRefs();
-      ArrayList<ObjectInspector> newFieldsOI = new ArrayList<ObjectInspector>();
-      ArrayList<String> newFieldsName = new ArrayList<String>();
-      this.dpStartCol = 0;
-      for (StructField sf : fieldOI) {
-        String fn = sf.getFieldName();
-        if (!dpCtx.getInputToDPCols().containsKey(fn)) {
-          newFieldsOI.add(sf.getFieldObjectInspector());
-          newFieldsName.add(sf.getFieldName());
-          this.dpStartCol++;
-        } else {
-          // once we found the start column for partition column we are done
-          break;
-        }
-      }
-      assert newFieldsOI.size() > 0 : "new Fields ObjectInspector is empty";
-
+      this.dpStartCol = Utilities.getDPColOffset(conf);
       this.subSetOI = new SubStructObjectInspector(soi, 0, this.dpStartCol);
       this.dpVals = new ArrayList<String>(numDynParts);
       this.dpWritables = new ArrayList<Object>(numDynParts);

http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index bcf85a4..5b21af9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -119,6 +119,7 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
@@ -3916,4 +3917,20 @@ public final class Utilities {
       HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD, "");
     }
   }
+
+  public static int getDPColOffset(FileSinkDesc conf) {
+
+    if (conf.getWriteType() == AcidUtils.Operation.DELETE) {
+      // For deletes, there is only ROW__ID in non-partitioning, non-bucketing columns.
+      //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details.
+      return 1;
+    } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
+      // For updates, ROW__ID is an extra column at index 0.
+      //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details.
+      return getColumnNames(conf.getTableInfo().getProperties()).size() + 1;
+    } else {
+      return getColumnNames(conf.getTableInfo().getProperties()).size();
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
index 5c6a6df..25156b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
@@ -843,7 +843,7 @@ public final class ConstantPropagateProcFactory {
           }
         }
         if (constant.getTypeInfo().getCategory() != Category.PRIMITIVE) {
-          // nested complex types cannot be folded cleanly 
+          // nested complex types cannot be folded cleanly
           return null;
         }
         Object value = constant.getValue();
@@ -1163,16 +1163,15 @@ public final class ConstantPropagateProcFactory {
       DynamicPartitionCtx dpCtx = fsdesc.getDynPartCtx();
       if (dpCtx != null) {
 
-        // If all dynamic partitions are propagated as constant, remove DP.
-        Set<String> inputs = dpCtx.getInputToDPCols().keySet();
-
         // Assume only 1 parent for FS operator
         Operator<? extends Serializable> parent = op.getParentOperators().get(0);
         Map<ColumnInfo, ExprNodeDesc> parentConstants = cppCtx.getPropagatedConstants(parent);
         RowSchema rs = parent.getSchema();
         boolean allConstant = true;
-        for (String input : inputs) {
-          ColumnInfo ci = rs.getColumnInfo(input);
+        int dpColStartIdx = Utilities.getDPColOffset(fsdesc);
+        List<ColumnInfo> colInfos = rs.getSignature();
+        for (int i = dpColStartIdx; i < colInfos.size(); i++) {
+          ColumnInfo ci = colInfos.get(i);
           if (parentConstants.get(ci) == null) {
             allConstant = false;
             break;

http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 02fbdfe..c696fd5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -497,9 +497,6 @@ public final class GenMapRedUtils {
         partsList = PartitionPruner.prune(tsOp, parseCtx, alias_id);
       } catch (SemanticException e) {
         throw e;
-      } catch (HiveException e) {
-        LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
-        throw new SemanticException(e.getMessage(), e);
       }
     }
 
@@ -990,7 +987,7 @@ public final class GenMapRedUtils {
     fileSinkOp.setParentOperators(Utilities.makeList(parent));
 
     // Create a dummy TableScanOperator for the file generated through fileSinkOp
-    TableScanOperator tableScanOp = (TableScanOperator) createTemporaryTableScanOperator(
+    TableScanOperator tableScanOp = createTemporaryTableScanOperator(
             parent.getSchema());
 
     // Connect this TableScanOperator to child.
@@ -1235,19 +1232,16 @@ public final class GenMapRedUtils {
       // adding DP ColumnInfo to the RowSchema signature
       ArrayList<ColumnInfo> signature = inputRS.getSignature();
       String tblAlias = fsInputDesc.getTableInfo().getTableName();
-      LinkedHashMap<String, String> colMap = new LinkedHashMap<String, String>();
       for (String dpCol : dpCtx.getDPColNames()) {
         ColumnInfo colInfo = new ColumnInfo(dpCol,
             TypeInfoFactory.stringTypeInfo, // all partition column type should be string
             tblAlias, true); // partition column is virtual column
         signature.add(colInfo);
-        colMap.put(dpCol, dpCol); // input and output have the same column name
       }
       inputRS.setSignature(signature);
 
       // create another DynamicPartitionCtx, which has a different input-to-DP column mapping
       DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx);
-      dpCtx2.setInputToDPCols(colMap);
       fsOutputDesc.setDynPartCtx(dpCtx2);
 
       // update the FileSinkOperator to include partition columns
@@ -1896,7 +1890,7 @@ public final class GenMapRedUtils {
         "Partition Names, " + Arrays.toString(partNames) + " don't match partition Types, "
         + Arrays.toString(partTypes));
 
-    Map<String, String> typeMap = new HashMap();
+    Map<String, String> typeMap = new HashMap<>();
     for (int i = 0; i < partNames.length; i++) {
       String previousValue = typeMap.put(partNames[i], partTypes[i]);
       Preconditions.checkArgument(previousValue == null, "Partition columns configuration is inconsistent. "

http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index dbc6d8f..4bec228 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -736,7 +736,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     Path dataDir = null;
     if(!qb.getEncryptedTargetTablePaths().isEmpty()) {
       //currently only Insert into T values(...) is supported thus only 1 values clause
-      //and only 1 target table are possible.  If/when support for 
+      //and only 1 target table are possible.  If/when support for
       //select ... from values(...) is added an insert statement may have multiple
       //encrypted target tables.
       dataDir = ctx.getMRTmpPath(qb.getEncryptedTargetTablePaths().get(0).toUri());
@@ -1556,7 +1556,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
       for (String alias : tabAliases) {
         String tab_name = qb.getTabNameForAlias(alias);
-        
+
         // we first look for this alias from CTE, and then from catalog.
         /*
          * if this s a CTE reference: Add its AST as a SubQuery to this QB.
@@ -6830,30 +6830,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
               .getColumnInfos()), input), rowResolver);
       input.setColumnExprMap(colExprMap);
     }
-
-    rowFields = opParseCtx.get(input).getRowResolver()
-        .getColumnInfos();
-    if (deleting()) {
-      // Figure out if we have partition columns in the list or not.  If so,
-      // add them into the mapping.  Partition columns will be located after the row id.
-      if (rowFields.size() > 1) {
-        // This means we have partition columns to deal with, so set up the mapping from the
-        // input to the partition columns.
-        dpCtx.mapInputToDP(rowFields.subList(1, rowFields.size()));
-      }
-    } else if (updating()) {
-      // In this case we expect the number of in fields to exceed the number of out fields by one
-      // (for the ROW__ID virtual column).  If there are more columns than this,
-      // then the extras are for dynamic partitioning
-      if (dynPart && dpCtx != null) {
-        dpCtx.mapInputToDP(rowFields.subList(tableFields.size() + 1, rowFields.size()));
-      }
-    } else {
-      if (dynPart && dpCtx != null) {
-        // create the mapping from input ExprNode to dest table DP column
-        dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size()));
-      }
-    }
     return input;
   }
 
@@ -10105,7 +10081,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         return;
       }
       for (Node child : node.getChildren()) {
-        //each insert of multi insert looks like 
+        //each insert of multi insert looks like
         //(TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME T1)))
         if (((ASTNode) child).getToken().getType() != HiveParser.TOK_INSERT) {
           continue;

http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
index 24db7d0..95d5635 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
@@ -19,14 +19,11 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.metadata.Table;
 
 public class DynamicPartitionCtx implements Serializable {
@@ -43,8 +40,6 @@ public class DynamicPartitionCtx implements Serializable {
   private Path rootPath; // the root path DP columns paths start from
   private int numBuckets;  // number of buckets in each partition
 
-  private Map<String, String> inputToDPCols; // mapping from input column names to DP columns
-
   private List<String> spNames; // sp column names
   private List<String> dpNames; // dp column names
   private String defaultPartName; // default partition name in case of null or empty value
@@ -71,7 +66,6 @@ public class DynamicPartitionCtx implements Serializable {
     }
     this.numDPCols = dpNames.size();
     this.numSPCols = spNames.size();
-    this.inputToDPCols = new HashMap<String, String>();
     if (this.numSPCols > 0) {
       this.spPath = Warehouse.makeDynamicPartName(partSpec);
     } else {
@@ -86,25 +80,12 @@ public class DynamicPartitionCtx implements Serializable {
     this.spPath = dp.spPath;
     this.rootPath = dp.rootPath;
     this.numBuckets = dp.numBuckets;
-    this.inputToDPCols = dp.inputToDPCols;
     this.spNames = dp.spNames;
     this.dpNames = dp.dpNames;
     this.defaultPartName = dp.defaultPartName;
     this.maxPartsPerNode = dp.maxPartsPerNode;
   }
 
-  public void mapInputToDP(List<ColumnInfo> fs) {
-
-      assert fs.size() == this.numDPCols: "input DP column size != numDPCols";
-
-      Iterator<ColumnInfo> itr1 = fs.iterator();
-      Iterator<String> itr2 = dpNames.iterator();
-
-      while (itr1.hasNext() && itr2.hasNext()) {
-        inputToDPCols.put(itr1.next().getInternalName(), itr2.next());
-      }
-  }
-
   public int getMaxPartitionsPerNode() {
     return this.maxPartsPerNode;
   }
@@ -161,14 +142,6 @@ public class DynamicPartitionCtx implements Serializable {
     this.spNames = sp;
   }
 
-  public Map<String, String> getInputToDPCols() {
-    return this.inputToDPCols;
-  }
-
-  public void setInputToDPCols(Map<String, String> map) {
-    this.inputToDPCols = map;
-  }
-
   public void setNumDPCols(int dp) {
     this.numDPCols = dp;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index c6ae030..9e89376 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
@@ -45,12 +44,11 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.stats.StatsAggregator;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -77,7 +75,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -94,8 +91,7 @@ public class TestFileSinkOperator {
   private static TableDesc nonAcidTableDescriptor;
   private static TableDesc acidTableDescriptor;
   private static ObjectInspector inspector;
-  private static List<TFSORow> rows;
-  private static ValidTxnList txnList;
+  private static List<Row> rows;
 
   private Path basePath;
   private JobConf jc;
@@ -105,34 +101,33 @@ public class TestFileSinkOperator {
     Properties properties = new Properties();
     properties.setProperty(serdeConstants.SERIALIZATION_LIB, TFSOSerDe.class.getName());
     nonAcidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties);
+    properties.setProperty(serdeConstants.LIST_COLUMNS,"data");
     properties = new Properties(properties);
     properties.setProperty(hive_metastoreConstants.BUCKET_COUNT, "1");
     acidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties);
-
     tmpdir = new File(System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") +
         "testFileSinkOperator");
     tmpdir.mkdir();
     tmpdir.deleteOnExit();
-    txnList = new ValidReadTxnList(new long[]{}, 2);
   }
 
   @Test
   public void testNonAcidWrite() throws Exception {
     setBasePath("write");
-    setupData(DataFormat.SIMPLE);
+    setupData(DataFormat.WITH_PARTITION_VALUE);
     FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, false, 0);
     processRows(op);
-    confirmOutput();
+    confirmOutput(DataFormat.WITH_PARTITION_VALUE);
   }
 
   @Test
   public void testInsert() throws Exception {
     setBasePath("insert");
-    setupData(DataFormat.SIMPLE);
+    setupData(DataFormat.WITH_PARTITION_VALUE);
     FileSinkOperator op = getFileSink(AcidUtils.Operation.INSERT, false, 1);
     processRows(op);
     Assert.assertEquals("10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
-    confirmOutput();
+    confirmOutput(DataFormat.WITH_PARTITION_VALUE);
   }
 
   @Test
@@ -142,7 +137,7 @@ public class TestFileSinkOperator {
     FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, false, 2);
     processRows(op);
     Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
-    confirmOutput();
+    confirmOutput(DataFormat.WITH_RECORD_ID);
   }
 
   @Test
@@ -152,7 +147,7 @@ public class TestFileSinkOperator {
     FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, false, 2);
     processRows(op);
     Assert.assertEquals("-10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
-    confirmOutput();
+    confirmOutput(DataFormat.WITH_RECORD_ID);
   }
 
   @Test
@@ -161,7 +156,7 @@ public class TestFileSinkOperator {
     setupData(DataFormat.WITH_PARTITION_VALUE);
     FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, true, 0);
     processRows(op);
-    confirmOutput();
+    confirmOutput(DataFormat.WITH_PARTITION_VALUE);
   }
 
 
@@ -174,7 +169,7 @@ public class TestFileSinkOperator {
     // We only expect 5 here because we'll get whichever of the partitions published its stats
     // last.
     Assert.assertEquals("5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
-    confirmOutput();
+    confirmOutput(DataFormat.WITH_PARTITION_VALUE);
   }
 
   @Test
@@ -184,19 +179,19 @@ public class TestFileSinkOperator {
     FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, true, 2);
     processRows(op);
     Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
-    confirmOutput();
+    confirmOutput(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE);
   }
 
   @Test
   public void testDeleteDynamicPartitioning() throws Exception {
     setBasePath("deleteDP");
-    setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE);
+    setupData(DataFormat.WITH_RECORD_ID);
     FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, true, 2);
     processRows(op);
     // We only expect -5 here because we'll get whichever of the partitions published its stats
     // last.
     Assert.assertEquals("-5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
-    confirmOutput();
+    confirmOutput(DataFormat.WITH_RECORD_ID);
   }
 
 
@@ -217,64 +212,52 @@ public class TestFileSinkOperator {
 
   }
 
-  private enum DataFormat {SIMPLE, WITH_RECORD_ID, WITH_PARTITION_VALUE,
-    WITH_RECORD_ID_AND_PARTITION_VALUE};
+  private enum DataFormat {WITH_RECORD_ID, WITH_PARTITION_VALUE, WITH_RECORD_ID_AND_PARTITION_VALUE};
 
   private void setupData(DataFormat format) {
 
-    // Build object inspector
-    inspector = ObjectInspectorFactory.getReflectionObjectInspector
-        (TFSORow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
-    rows = new ArrayList<TFSORow>();
-
+    Class<?> rType;
     switch (format) {
-      case SIMPLE:
-        // Build rows
-        for (int i = 0; i < 10; i++) {
-          rows.add(
-              new TFSORow(
-                  new Text("mary had a little lamb")
-              )
-          );
-        }
+      case WITH_PARTITION_VALUE:
+        rType = RowWithPartVal.class;
         break;
-
       case WITH_RECORD_ID:
-        for (int i = 0; i < 10; i++) {
-          rows.add(
-              new TFSORow(
-                  new Text("its fleect was white as snow"),
-                  new RecordIdentifier(1, 1, i)
-              )
-          );
-        }
+        rType = RowWithRecID.class;
         break;
-
-      case WITH_PARTITION_VALUE:
-        for (int i = 0; i < 10; i++) {
-          rows.add(
-              new TFSORow(
-                  new Text("its fleect was white as snow"),
-                  (i < 5) ? new Text("Monday") : new Text("Tuesday")
-              )
-          );
-        }
-        break;
-
       case WITH_RECORD_ID_AND_PARTITION_VALUE:
-        for (int i = 0; i < 10; i++) {
-          rows.add(
-              new TFSORow(
-                  new Text("its fleect was white as snow"),
-                  (i < 5) ? new Text("Monday") : new Text("Tuesday"),
-                  new RecordIdentifier(1, 1, i)
-              )
-          );
-        }
+        rType = RowWithPartNRecID.class;
         break;
-
       default:
-        throw new RuntimeException("Unknown option!");
+        throw new RuntimeException("Unknown type");
+    }
+    inspector = ObjectInspectorFactory.getReflectionObjectInspector
+        (rType, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+
+    rows = new ArrayList<Row>();
+    Row r;
+    for (int i = 0; i < 10; i++) {
+      switch (format) {
+        case WITH_PARTITION_VALUE:
+          r =
+          new RowWithPartVal(
+              new Text("mary had a little lamb"),
+              (i < 5) ? new Text("Monday") : new Text("Tuesday"));
+          break;
+        case WITH_RECORD_ID:
+          r = new RowWithRecID(new RecordIdentifier(1, 1, i),
+              (i < 5) ? new Text("Monday") : new Text("Tuesday"));
+          break;
+        case WITH_RECORD_ID_AND_PARTITION_VALUE:
+          r = new RowWithPartNRecID(
+              new Text("its fleect was white as snow"),
+              (i < 5) ? new Text("Monday") : new Text("Tuesday"),
+                  new RecordIdentifier(1, 1, i));
+          break;
+        default:
+          throw new RuntimeException("Unknown data format");
+      }
+      rows.add(r);
+
     }
   }
 
@@ -300,9 +283,6 @@ public class TestFileSinkOperator {
       Map<String, String> partColMap= new LinkedHashMap<String, String>(1);
       partColMap.put(PARTCOL_NAME, null);
       DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100);
-      Map<String, String> partColNames = new HashMap<String, String>(1);
-      partColNames.put(PARTCOL_NAME, PARTCOL_NAME);
-      dpCtx.setInputToDPCols(partColNames);
       //todo: does this need the finalDestination?
       desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null);
     } else {
@@ -320,27 +300,27 @@ public class TestFileSinkOperator {
   }
 
   private void processRows(FileSinkOperator op) throws HiveException {
-    for (TFSORow r : rows) op.process(r, 0);
+    for (Object r : rows) op.process(r, 0);
     op.jobCloseOp(jc, true);
     op.close(false);
   }
 
-  private void confirmOutput() throws IOException, SerDeException {
+  private void confirmOutput(DataFormat rType) throws IOException, SerDeException, CloneNotSupportedException {
     Path[] paths = findFilesInBasePath();
-    TFSOInputFormat input = new TFSOInputFormat();
+    TFSOInputFormat input = new TFSOInputFormat(rType);
     FileInputFormat.setInputPaths(jc, paths);
 
     InputSplit[] splits = input.getSplits(jc, 1);
-    RecordReader<NullWritable, TFSORow> reader = input.getRecordReader(splits[0], jc,
+    RecordReader<NullWritable, Row> reader = input.getRecordReader(splits[0], jc,
         Mockito.mock(Reporter.class));
     NullWritable key = reader.createKey();
-    TFSORow value = reader.createValue();
-    List<TFSORow> results = new ArrayList<TFSORow>(rows.size());
-    List<TFSORow> sortedRows = new ArrayList<TFSORow>(rows.size());
+    Row value = reader.createValue();
+    List<Row> results = new ArrayList<Row>(rows.size());
+    List<Row> sortedRows = new ArrayList<Row>(rows.size());
     for (int i = 0; i < rows.size(); i++) {
       Assert.assertTrue(reader.next(key, value));
-      results.add(new TFSORow(value));
-      sortedRows.add(new TFSORow(rows.get(i)));
+      results.add(value.clone());
+      sortedRows.add(rows.get(i));
     }
     Assert.assertFalse(reader.next(key, value));
     Collections.sort(results);
@@ -370,36 +350,172 @@ public class TestFileSinkOperator {
     }
   }
 
-  private static class TFSORow implements WritableComparable<TFSORow> {
+  public static interface Row extends WritableComparable<Row> {
+
+    Row clone() throws CloneNotSupportedException;
+  }
+
+  private static class RowWithRecID implements Row {
+
     private RecordIdentifier recId;
-    private Text data;
     private Text partVal;
 
-    TFSORow() {
-      this(null, null, null);
+    public RowWithRecID() {
+    }
+    public RowWithRecID(RecordIdentifier recId, Text partVal) {
+      super();
+      this.recId = recId;
+      this.partVal = partVal;
     }
 
-    TFSORow(Text t) {
-      this(t, null, null);
+    @Override
+    public
+    Row clone() throws CloneNotSupportedException {
+      return new RowWithRecID(this.recId, this.partVal);
     }
 
-    TFSORow(Text t, Text pv) {
-      this(t, pv, null);
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+      if (partVal == null) {
+        dataOutput.writeBoolean(false);
+      } else {
+        dataOutput.writeBoolean(true);
+        partVal.write(dataOutput);
+      }
+      if (recId == null) {
+        dataOutput.writeBoolean(false);
+      } else {
+        dataOutput.writeBoolean(true);
+        recId.write(dataOutput);
+      }
     }
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+      boolean notNull = dataInput.readBoolean();
+      if (notNull) {
+        partVal = new Text();
+        partVal.readFields(dataInput);
+      }
+      notNull = dataInput.readBoolean();
+      if (notNull) {
+        recId = new RecordIdentifier();
+        recId.readFields(dataInput);
+      }
 
-    TFSORow(Text t, RecordIdentifier ri) {
-      this(t, null, ri);
     }
+    @Override
+    public int compareTo(Row row) {
+      RowWithRecID other = (RowWithRecID) row;
+      if (recId == null && other.recId == null) {
+        return comparePartVal(other);
+      } else if (recId == null) {
+        return -1;
+      } else {
+        int rc = recId.compareTo(other.recId);
+        if (rc == 0) return comparePartVal(other);
+        else return rc;
+      }
+    }
+    private int comparePartVal(RowWithRecID other) {
 
-    TFSORow(Text t, Text pv, RecordIdentifier ri) {
+        return partVal.compareTo(other.partVal);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return compareTo((RowWithRecID)obj) == 0;
+    }
+  }
+  private static class RowWithPartVal implements Row {
+
+    public RowWithPartVal(Text data, Text partVal) {
+      super();
+      this.data = data;
+      this.partVal = partVal;
+    }
+
+    public RowWithPartVal() {
+    }
+
+    private  Text data;
+    private  Text partVal;
+
+    @Override
+    public Row clone() throws CloneNotSupportedException {
+      return new RowWithPartVal(this.data, this.partVal);
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+      data.write(dataOutput);
+      if (partVal == null) {
+        dataOutput.writeBoolean(false);
+      } else {
+        dataOutput.writeBoolean(true);
+        partVal.write(dataOutput);
+      }
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+      data = new Text();
+      data.readFields(dataInput);
+      boolean notNull = dataInput.readBoolean();
+      if (notNull) {
+        partVal = new Text();
+        partVal.readFields(dataInput);
+      }
+    }
+
+    @Override
+    public int compareTo(Row row) {
+      RowWithPartVal other = (RowWithPartVal) row;
+      if (partVal == null && other.partVal == null) {
+        return compareData(other);
+      } else if (partVal == null) {
+        return -1;
+      } else {
+        int rc = partVal.compareTo(other.partVal);
+        if (rc == 0) return compareData(other);
+        else return rc;
+      }
+    }
+
+    private int compareData(RowWithPartVal other) {
+      if (data == null && other.data == null) return 0;
+      else if (data == null) return -1;
+      else return data.compareTo(other.data);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof RowWithPartVal) {
+        RowWithPartVal other = (RowWithPartVal) obj;
+        return compareTo(other) == 0;
+
+      } else {
+        return false;
+      }
+    }
+  }
+  private static class RowWithPartNRecID implements Row {
+    private RecordIdentifier recId;
+    private Text data;
+    private Text partVal;
+
+    RowWithPartNRecID() {
+      this(null, null, null);
+    }
+
+    RowWithPartNRecID(Text t, Text pv, RecordIdentifier ri) {
       data = t;
       partVal = pv;
       recId = ri;
-
     }
 
-    TFSORow(TFSORow other) {
-      this(other.data, other.partVal, other.recId);
+    @Override
+    public RowWithPartNRecID clone() throws CloneNotSupportedException {
+      return new RowWithPartNRecID(this.data, this.partVal, this.recId);
     }
 
     @Override
@@ -437,8 +553,8 @@ public class TestFileSinkOperator {
 
     @Override
     public boolean equals(Object obj) {
-      if (obj instanceof TFSORow) {
-        TFSORow other = (TFSORow) obj;
+      if (obj instanceof RowWithPartNRecID) {
+        RowWithPartNRecID other = (RowWithPartNRecID) obj;
         if (data == null && other.data == null) return checkPartVal(other);
         else if (data == null) return false;
         else if (data.equals(other.data)) return checkPartVal(other);
@@ -448,21 +564,22 @@ public class TestFileSinkOperator {
       }
     }
 
-    private boolean checkPartVal(TFSORow other) {
+    private boolean checkPartVal(RowWithPartNRecID other) {
       if (partVal == null && other.partVal == null) return checkRecId(other);
       else if (partVal == null) return false;
       else if (partVal.equals(other.partVal)) return checkRecId(other);
       else return false;
     }
 
-    private boolean checkRecId(TFSORow other) {
+    private boolean checkRecId(RowWithPartNRecID other) {
       if (recId == null && other.recId == null) return true;
       else if (recId == null) return false;
       else return recId.equals(other.recId);
     }
 
     @Override
-    public int compareTo(TFSORow other) {
+    public int compareTo(Row row) {
+      RowWithPartNRecID other = (RowWithPartNRecID) row;
       if (recId == null && other.recId == null) {
         return comparePartVal(other);
       } else if (recId == null) {
@@ -474,7 +591,7 @@ public class TestFileSinkOperator {
       }
     }
 
-    private int comparePartVal(TFSORow other) {
+    private int comparePartVal(RowWithPartNRecID other) {
       if (partVal == null && other.partVal == null) {
         return compareData(other);
       } else if (partVal == null) {
@@ -486,21 +603,26 @@ public class TestFileSinkOperator {
       }
     }
 
-    private int compareData(TFSORow other) {
+    private int compareData(RowWithPartNRecID other) {
       if (data == null && other.data == null) return 0;
       else if (data == null) return -1;
       else return data.compareTo(other.data);
     }
   }
 
-  private static class TFSOInputFormat extends FileInputFormat<NullWritable, TFSORow>
-                                       implements AcidInputFormat<NullWritable, TFSORow> {
+  private static class TFSOInputFormat extends FileInputFormat<NullWritable, Row>
+                                       implements AcidInputFormat<NullWritable, Row> {
 
     FSDataInputStream in[] = null;
     int readingFrom = -1;
+    DataFormat rType;
+
+    public TFSOInputFormat(DataFormat rType) {
+      this.rType = rType;
+    }
 
     @Override
-    public RecordReader<NullWritable, TFSORow> getRecordReader(
+    public RecordReader<NullWritable, Row> getRecordReader(
         InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {
       if (in == null) {
         Path paths[] = FileInputFormat.getInputPaths(entries);
@@ -511,10 +633,10 @@ public class TestFileSinkOperator {
         }
         readingFrom = 0;
       }
-      return new RecordReader<NullWritable, TFSORow>() {
+      return new RecordReader<NullWritable, Row>() {
 
         @Override
-        public boolean next(NullWritable nullWritable, TFSORow tfsoRecord) throws
+        public boolean next(NullWritable nullWritable, Row tfsoRecord) throws
             IOException {
           try {
             tfsoRecord.readFields(in[readingFrom]);
@@ -532,8 +654,18 @@ public class TestFileSinkOperator {
         }
 
         @Override
-        public TFSORow createValue() {
-          return new TFSORow();
+        public Row createValue() {
+          switch (rType) {
+            case WITH_RECORD_ID_AND_PARTITION_VALUE:
+              return new RowWithPartNRecID();
+            case WITH_PARTITION_VALUE:
+              return new RowWithPartVal();
+            case WITH_RECORD_ID:
+              return new RowWithRecID();
+
+            default:
+              throw new RuntimeException("Unknown row Type");
+          }
         }
 
         @Override
@@ -554,14 +686,14 @@ public class TestFileSinkOperator {
     }
 
     @Override
-    public RowReader<TFSORow> getReader(InputSplit split,
+    public RowReader<Row> getReader(InputSplit split,
                                            Options options) throws
         IOException {
       return null;
     }
 
     @Override
-    public RawReader<TFSORow> getRawReader(Configuration conf,
+    public RawReader<Row> getRawReader(Configuration conf,
                                               boolean collapseEvents,
                                               int bucket,
                                               ValidTxnList validTxnList,
@@ -578,9 +710,9 @@ public class TestFileSinkOperator {
     }
   }
 
-  public static class TFSOOutputFormat extends FileOutputFormat<NullWritable, TFSORow>
-      implements AcidOutputFormat<NullWritable, TFSORow> {
-    List<TFSORow> records = new ArrayList<TFSORow>();
+  public static class TFSOOutputFormat extends FileOutputFormat<NullWritable, Row>
+      implements AcidOutputFormat<NullWritable, Row> {
+    List<Row> records = new ArrayList<>();
     long numRecordsAdded = 0;
     FSDataOutputStream out = null;
 
@@ -588,7 +720,6 @@ public class TestFileSinkOperator {
     public RecordUpdater getRecordUpdater(final Path path, final Options options) throws
         IOException {
 
-      final StructObjectInspector inspector = (StructObjectInspector)options.getInspector();
       return new RecordUpdater() {
         @Override
         public void insert(long currentTransaction, Object row) throws IOException {
@@ -608,9 +739,8 @@ public class TestFileSinkOperator {
         }
 
         private void addRow(Object row) {
-          assert row instanceof TFSORow : "Expected TFSORow but got " +
-              row.getClass().getName();
-          records.add((TFSORow)row);
+          assert row instanceof Row : "Expected Row but got " + row.getClass().getName();
+          records.add((Row)row);
         }
 
         @Override
@@ -619,7 +749,7 @@ public class TestFileSinkOperator {
             FileSystem fs = path.getFileSystem(options.getConfiguration());
             out = fs.create(path);
           }
-          for (TFSORow r : records) r.write(out);
+          for (Writable r : records) r.write(out);
           records.clear();
           out.flush();
         }
@@ -657,8 +787,8 @@ public class TestFileSinkOperator {
       return new FileSinkOperator.RecordWriter() {
         @Override
         public void write(Writable w) throws IOException {
-          Assert.assertTrue(w instanceof TFSORow);
-          records.add((TFSORow) w);
+          Assert.assertTrue(w instanceof Row);
+          records.add((Row)w);
         }
 
         @Override
@@ -667,7 +797,7 @@ public class TestFileSinkOperator {
             FileSystem fs = finalOutPath.getFileSystem(jc);
             out = fs.create(finalOutPath);
           }
-          for (TFSORow r : records) r.write(out);
+          for (Writable r : records) r.write(out);
           records.clear();
           out.flush();
           out.close();
@@ -676,7 +806,7 @@ public class TestFileSinkOperator {
     }
 
     @Override
-    public RecordWriter<NullWritable, TFSORow> getRecordWriter(
+    public RecordWriter<NullWritable, Row> getRecordWriter(
         FileSystem fileSystem, JobConf entries, String s, Progressable progressable) throws
         IOException {
       return null;
@@ -688,7 +818,7 @@ public class TestFileSinkOperator {
     }
   }
 
-  public static class TFSOSerDe implements SerDe {
+  public static class TFSOSerDe extends AbstractSerDe {
 
     @Override
     public void initialize(Configuration conf, Properties tbl) throws SerDeException {
@@ -697,20 +827,18 @@ public class TestFileSinkOperator {
 
     @Override
     public Class<? extends Writable> getSerializedClass() {
-      return TFSORow.class;
+      return RowWithPartNRecID.class;
     }
 
     @Override
     public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
-      assert obj instanceof TFSORow : "Expected TFSORow or decendent, got "
-          + obj.getClass().getName();
-      return (TFSORow)obj;
+      assert obj instanceof Row : "Expected Row or decendent, got " + obj.getClass().getName();
+      return (Row)obj;
     }
 
     @Override
     public Object deserialize(Writable blob) throws SerDeException {
-      assert blob instanceof TFSORow : "Expected TFSORow or decendent, got "
-          + blob.getClass().getName();
+      assert blob instanceof Row : "Expected Row or decendent, got "+ blob.getClass().getName();
       return blob;
     }