You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/03/26 19:14:39 UTC

svn commit: r1581977 [2/5] - in /hive/trunk: ./ common/src/java/org/apache/hadoop/hive/common/ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/ metastore/src/java/org/apache/hadoop/hive/metastore/ metastore/src/java/org/apache/hadoop/hi...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Wed Mar 26 18:14:37 2014
@@ -20,8 +20,8 @@ package org.apache.hadoop.hive.ql.io.orc
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -36,25 +36,28 @@ import org.apache.hadoop.fs.BlockLocatio
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnListImpl;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
-import org.apache.hadoop.hive.ql.io.orc.Metadata;
-import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo;
-import org.apache.hadoop.hive.ql.io.orc.RecordReader;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.LongWritable;
@@ -72,17 +75,37 @@ import com.google.common.cache.CacheBuil
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 /**
  * A MapReduce/Hive input format for ORC files.
+ * <p>
+ * This class implements both the classic InputFormat, which stores the rows
+ * directly, and AcidInputFormat, which stores a series of events with the
+ * following schema:
+ * <pre>
+ *   class AcidEvent&lt;ROW&gt; {
+ *     enum ACTION {INSERT, UPDATE, DELETE}
+ *     ACTION operation;
+ *     long originalTransaction;
+ *     int bucket;
+ *     long rowId;
+ *     long currentTransaction;
+ *     ROW row;
+ *   }
+ * </pre>
+ * Each AcidEvent object corresponds to an update event. The
+ * originalTransaction, bucket, and rowId are the unique identifier for the row.
+ * The operation and currentTransaction are the operation and the transaction
+ * that added this event. Insert and update events include the entire row, while
+ * delete events have null for row.
  */
 public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
-  InputFormatChecker, VectorizedInputFormatInterface {
-
-  VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat();
+  InputFormatChecker, VectorizedInputFormatInterface,
+    AcidInputFormat<OrcStruct> {
 
   private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
+  static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
   static final String MIN_SPLIT_SIZE =
-      ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE");
+      SHIMS.getHadoopConfNames().get("MAPREDMINSPLITSIZE");
   static final String MAX_SPLIT_SIZE =
-      ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMAXSPLITSIZE");
+      SHIMS.getHadoopConfNames().get("MAPREDMAXSPLITSIZE");
 
   private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
   private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024;
@@ -113,13 +136,13 @@ public class OrcInputFormat  implements 
 
 
     OrcRecordReader(Reader file, Configuration conf,
-                    long offset, long length) throws IOException {
+                    FileSplit split) throws IOException {
       List<OrcProto.Type> types = file.getTypes();
       this.file = file;
       numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+      this.offset = split.getStart();
+      this.length = split.getLength();
       this.reader = createReaderFromFile(file, conf, offset, length);
-      this.offset = offset;
-      this.length = length;
       this.stats = new SerDeStats();
     }
 
@@ -166,139 +189,107 @@ public class OrcInputFormat  implements 
       return stats;
     }
   }
-  
-  static RecordReader createReaderFromFile(
-      Reader file, Configuration conf, long offset, long length)
-      throws IOException {
-    List<OrcProto.Type> types = file.getTypes();
-    boolean[] includedColumns = findIncludedColumns(types, conf);
-    String[] columnNames = getIncludedColumnNames(types, includedColumns,
-        conf);
-    SearchArgument sarg = createSarg(types, conf);
-    RecordReader reader =
-        file.rows(offset, length, includedColumns, sarg, columnNames);
-    return reader;
+
+  /**
+   * Get the root column for the row. In ACID format files, it is offset by
+   * the extra metadata columns.
+   * @param isOriginal is the file in the original format?
+   * @return the column number for the root of row.
+   */
+  private static int getRootColumn(boolean isOriginal) {
+    return isOriginal ? 0 : (OrcRecordUpdater.ROW + 1);
   }
 
-  private static final PathFilter hiddenFileFilter = new PathFilter(){
-    public boolean accept(Path p){
-      String name = p.getName();
-      return !name.startsWith("_") && !name.startsWith(".");
-    }
-  };
+  public static RecordReader createReaderFromFile(Reader file,
+                                                  Configuration conf,
+                                                  long offset, long length
+                                                  ) throws IOException {
+    Reader.Options options = new Reader.Options().range(offset, length);
+    boolean isOriginal =
+        !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME);
+    List<OrcProto.Type> types = file.getTypes();
+    setIncludedColumns(options, types, conf, isOriginal);
+    setSearchArgument(options, types, conf, isOriginal);
+    return file.rowsOptions(options);
+  }
 
   /**
    * Recurse down into a type subtree turning on all of the sub-columns.
    * @param types the types of the file
    * @param result the global view of columns that should be included
    * @param typeId the root of tree to enable
+   * @param rootColumn the top column
    */
-  static void includeColumnRecursive(List<OrcProto.Type> types,
+  private static void includeColumnRecursive(List<OrcProto.Type> types,
                                              boolean[] result,
-                                             int typeId) {
-    result[typeId] = true;
+                                             int typeId,
+                                             int rootColumn) {
+    result[typeId - rootColumn] = true;
     OrcProto.Type type = types.get(typeId);
     int children = type.getSubtypesCount();
     for(int i=0; i < children; ++i) {
-      includeColumnRecursive(types, result, type.getSubtypes(i));
-    }
-  }
-
-  public static SearchArgument createSarg(List<OrcProto.Type> types, Configuration conf) {
-    String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
-    if (serializedPushdown == null
-        || conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) == null) {
-      LOG.debug("No ORC pushdown predicate");
-      return null;
-    }
-    SearchArgument sarg = SearchArgument.FACTORY.create
-        (Utilities.deserializeExpression(serializedPushdown));
-    LOG.info("ORC pushdown predicate: " + sarg);
-    return sarg;
-  }
-
-  public static String[] getIncludedColumnNames(
-      List<OrcProto.Type> types, boolean[] includedColumns, Configuration conf) {
-    String columnNamesString = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("included columns names = " + columnNamesString);
-    }
-    if (columnNamesString == null || conf.get(TableScanDesc.FILTER_EXPR_CONF_STR) == null) {
-      return null;
-    }
-    String[] neededColumnNames = columnNamesString.split(",");
-    int i = 0;
-    String[] columnNames = new String[types.size()];
-    for(int columnId: types.get(0).getSubtypesList()) {
-      if (includedColumns == null || includedColumns[columnId]) {
-        columnNames[columnId] = neededColumnNames[i++];
-      }
+      includeColumnRecursive(types, result, type.getSubtypes(i), rootColumn);
     }
-    return columnNames;
   }
 
   /**
    * Take the configuration and figure out which columns we need to include.
-   * @param types the types of the file
+   * @param options the options to update
+   * @param types the types for the file
    * @param conf the configuration
-   * @return true for each column that should be included
+   * @param isOriginal is the file in the original format?
    */
-  public static boolean[] findIncludedColumns(List<OrcProto.Type> types, Configuration conf) {
-    LOG.info("included column ids = " + conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
-    if (ColumnProjectionUtils.isReadAllColumns(conf)) {
-      return null;
-    } else {
-      int numColumns = types.size();
+  static void setIncludedColumns(Reader.Options options,
+                                 List<OrcProto.Type> types,
+                                 Configuration conf,
+                                 boolean isOriginal) {
+    int rootColumn = getRootColumn(isOriginal);
+    if (!ColumnProjectionUtils.isReadAllColumns(conf)) {
+      int numColumns = types.size() - rootColumn;
       boolean[] result = new boolean[numColumns];
       result[0] = true;
-      OrcProto.Type root = types.get(0);
+      OrcProto.Type root = types.get(rootColumn);
       List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
       for(int i=0; i < root.getSubtypesCount(); ++i) {
         if (included.contains(i)) {
-          includeColumnRecursive(types, result, root.getSubtypes(i));
+          includeColumnRecursive(types, result, root.getSubtypes(i),
+              rootColumn);
         }
       }
-      // if we are filtering at least one column, return the boolean array
-      for(boolean include: result) {
-        if (!include) {
-          return result;
-        }
-      }
-      return null;
+      options.include(result);
+    } else {
+      options.include(null);
     }
   }
 
-  @SuppressWarnings("unchecked")
-  @Override
-  public org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
-      getRecordReader(InputSplit inputSplit, JobConf conf,
-                      Reporter reporter) throws IOException {
-    if (isVectorMode(conf)) {
-      org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(inputSplit, conf,
-          reporter);
-      return (org.apache.hadoop.mapred.RecordReader) vorr;
-    }
-    FileSplit fSplit = (FileSplit)inputSplit;
-    reporter.setStatus(fSplit.toString());
-    Path path = fSplit.getPath();
-    FileSystem fs = path.getFileSystem(conf);
-    Reader reader = null;
-
-    if(!(fSplit instanceof OrcSplit)){
-      //If CombineHiveInputFormat is used, it works with FileSplit and not OrcSplit
-      reader = OrcFile.createReader(fs, path, conf);
+  static void setSearchArgument(Reader.Options options,
+                                List<OrcProto.Type> types,
+                                Configuration conf,
+                                boolean isOriginal) {
+    int rootColumn = getRootColumn(isOriginal);
+    String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+    String columnNamesString =
+        conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+    if (serializedPushdown == null || columnNamesString == null) {
+      LOG.debug("No ORC pushdown predicate");
+      options.searchArgument(null, null);
     } else {
-      //We have OrcSplit, which may have footer metadata cached, so use the appropriate reader
-      //constructor
-      OrcSplit orcSplit = (OrcSplit) fSplit;
-      if (orcSplit.hasFooter()) {
-        FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo();
-        reader = OrcFile.createReader(fs, path, fMetaInfo, conf);
-      } else {
-        reader = OrcFile.createReader(fs, path, conf);
+      SearchArgument sarg = SearchArgument.FACTORY.create
+          (Utilities.deserializeExpression(serializedPushdown));
+      LOG.info("ORC pushdown predicate: " + sarg);
+      String[] neededColumnNames = columnNamesString.split(",");
+      String[] columnNames = new String[types.size() - rootColumn];
+      boolean[] includedColumns = options.getInclude();
+      int i = 0;
+      for(int columnId: types.get(rootColumn).getSubtypesList()) {
+        if (includedColumns == null || includedColumns[columnId]) {
+          // this is guaranteed to be positive because types only have children
+          // ids greater than their own id.
+          columnNames[columnId - rootColumn] = neededColumnNames[i++];
+        }
       }
+      options.searchArgument(sarg, columnNames);
     }
-    return new OrcRecordReader(reader, conf, fSplit.getStart(), fSplit.getLength());
   }
 
   @Override
@@ -306,8 +297,8 @@ public class OrcInputFormat  implements 
                                ArrayList<FileStatus> files
                               ) throws IOException {
 
-    if (isVectorMode(conf)) {
-      return voif.validateInput(fs, conf, files);
+    if (Utilities.isVectorMode(conf)) {
+      return new VectorizedOrcInputFormat().validateInput(fs, conf, files);
     }
 
     if (files.size() <= 0) {
@@ -315,7 +306,8 @@ public class OrcInputFormat  implements 
     }
     for (FileStatus file : files) {
       try {
-        OrcFile.createReader(fs, file.getPath(), conf);
+        OrcFile.createReader(file.getPath(),
+            OrcFile.readerOptions(conf).filesystem(fs));
       } catch (IOException e) {
         return false;
       }
@@ -323,10 +315,6 @@ public class OrcInputFormat  implements 
     return true;
   }
 
-  private boolean isVectorMode(Configuration conf) {
-    return Utilities.isVectorMode(conf);
-  }
-
   /**
    * Get the list of input {@link Path}s for the map-reduce job.
    *
@@ -351,43 +339,13 @@ public class OrcInputFormat  implements 
    * the different worker threads.
    */
   static class Context {
-    static class FileSplitInfo {
-      FileSplitInfo(Path file, long start, long length, String[] hosts,
-          FileMetaInfo fileMetaInfo) {
-        this.file = file;
-        this.start = start;
-        this.length = length;
-        this.hosts = hosts;
-        this.fileMetaInfo = fileMetaInfo;
-      }
-      Path getPath() {
-        return file;
-      }
-      long getStart() {
-        return start;
-      }
-      long getLength() {
-        return length;
-      }
-      String[] getLocations() {
-        return hosts;
-      }
-      FileMetaInfo getFileMetaInfo() {
-        return fileMetaInfo;
-      }
-      private Path file;
-      private long start;
-      private long length;
-      private String[] hosts;
-      FileMetaInfo fileMetaInfo;
-    }
     private final Configuration conf;
     private static Cache<Path, FileInfo> footerCache;
     private final ExecutorService threadPool;
-    private final List<FileSplitInfo> splits =
-        new ArrayList<FileSplitInfo>(10000);
+    private final List<OrcSplit> splits =
+        new ArrayList<OrcSplit>(10000);
+    private final int numBuckets;
     private final List<Throwable> errors = new ArrayList<Throwable>();
-    private final HadoopShims shims = ShimLoader.getHadoopShims();
     private final long maxSize;
     private final long minSize;
     private final boolean footerInSplits;
@@ -395,6 +353,7 @@ public class OrcInputFormat  implements 
     private final AtomicInteger cacheHitCounter = new AtomicInteger(0);
     private final AtomicInteger numFilesCounter = new AtomicInteger(0);
     private Throwable fatalError = null;
+    private ValidTxnList transactionList;
 
     /**
      * A count of the number of threads that may create more work for the
@@ -406,15 +365,20 @@ public class OrcInputFormat  implements 
       this.conf = conf;
       minSize = conf.getLong(MIN_SPLIT_SIZE, DEFAULT_MIN_SPLIT_SIZE);
       maxSize = conf.getLong(MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE);
-      footerInSplits = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS);
+      footerInSplits = HiveConf.getBoolVar(conf,
+          ConfVars.HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS);
+      numBuckets =
+          Math.max(conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0), 0);
       int cacheStripeDetailsSize = HiveConf.getIntVar(conf,
           ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE);
-      int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS);
+      int numThreads = HiveConf.getIntVar(conf,
+          ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS);
 
       cacheStripeDetails = (cacheStripeDetailsSize > 0);
 
       threadPool = Executors.newFixedThreadPool(numThreads,
-          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ORC_GET_SPLITS #%d").build());
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("ORC_GET_SPLITS #%d").build());
 
       synchronized (Context.class) {
         if (footerCache == null && cacheStripeDetails) {
@@ -422,6 +386,9 @@ public class OrcInputFormat  implements 
               .initialCapacity(cacheStripeDetailsSize).softValues().build();
         }
       }
+      String value = conf.get(ValidTxnList.VALID_TXNS_KEY,
+                              Long.MAX_VALUE + ":");
+      transactionList = new ValidTxnListImpl(value);
     }
 
     int getSchedulers() {
@@ -432,9 +399,9 @@ public class OrcInputFormat  implements 
      * Get the Nth split.
      * @param index if index >= 0, count from the front, otherwise count from
      *     the back.
-     * @result the Nth file split
+     * @return the Nth file split
      */
-    FileSplitInfo getResult(int index) {
+    OrcSplit getResult(int index) {
       if (index >= 0) {
         return splits.get(index);
       } else {
@@ -452,7 +419,8 @@ public class OrcInputFormat  implements 
      */
     synchronized void schedule(Runnable runnable) {
       if (fatalError == null) {
-        if (runnable instanceof FileGenerator || runnable instanceof SplitGenerator) {
+        if (runnable instanceof FileGenerator ||
+            runnable instanceof SplitGenerator) {
           schedulers += 1;
         }
         threadPool.execute(runnable);
@@ -513,23 +481,65 @@ public class OrcInputFormat  implements 
       this.dir = dir;
     }
 
+    private void scheduleSplits(FileStatus file,
+                                boolean isOriginal,
+                                boolean hasBase,
+                                List<Long> deltas) throws IOException{
+      FileInfo info = null;
+      if (context.cacheStripeDetails) {
+        info = verifyCachedFileInfo(file);
+      }
+      new SplitGenerator(context, fs, file, info, isOriginal, deltas,
+          hasBase).schedule();
+    }
+
     /**
      * For each path, get the list of files and blocks that they consist of.
      */
     @Override
     public void run() {
       try {
-        Iterator<FileStatus> itr = context.shims.listLocatedStatus(fs, dir,
-            hiddenFileFilter);
-        while (itr.hasNext()) {
-          FileStatus file = itr.next();
-          if (!file.isDir()) {
-            FileInfo fileInfo = null;
-            if (context.cacheStripeDetails) {
-              fileInfo = verifyCachedFileInfo(file);
+        AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
+            context.conf, context.transactionList);
+        List<Long> deltas =
+            AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
+        Path base = dirInfo.getBaseDirectory();
+        List<FileStatus> original = dirInfo.getOriginalFiles();
+
+        boolean[] covered = new boolean[context.numBuckets];
+        boolean isOriginal = base == null;
+
+        // if we have a base to work from
+        if (base != null || !original.isEmpty()) {
+
+          // find the base files (original or new style)
+          List<FileStatus> children = original;
+          if (base != null) {
+            children = SHIMS.listLocatedStatus(fs, base,
+               AcidUtils.hiddenFileFilter);
+          }
+
+          // for each child, schedule splits and mark off the bucket
+          for(FileStatus child: children) {
+            AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
+                (child.getPath(), context.conf);
+            scheduleSplits(child, isOriginal, true, deltas);
+            int b = opts.getBucket();
+            // If the bucket is in the valid range, mark it as covered.
+            // I wish Hive actually enforced bucketing all of the time.
+            if (b >= 0 && b < covered.length) {
+              covered[b] = true;
             }
-            SplitGenerator spgen = new SplitGenerator(context, fs, file, fileInfo);
-            spgen.schedule();
+          }
+        }
+
+        // Generate a split for any buckets that weren't covered.
+        // This happens in the case where a bucket just has deltas and no
+        // base.
+        for(int b=0; b < context.numBuckets; ++b) {
+          if (!covered[b]) {
+            context.splits.add(new OrcSplit(dir, b, 0, new String[0], null,
+                               false, false, deltas));
           }
         }
       } catch (Throwable th) {
@@ -554,7 +564,8 @@ public class OrcInputFormat  implements 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Info cached for path: " + file.getPath());
         }
-        if (fileInfo.modificationTime == file.getModificationTime() && fileInfo.size == file.getLen()) {
+        if (fileInfo.modificationTime == file.getModificationTime() &&
+            fileInfo.size == file.getLen()) {
           // Cached copy is valid
           context.cacheHitCounter.incrementAndGet();
           return fileInfo;
@@ -562,10 +573,12 @@ public class OrcInputFormat  implements 
           // Invalidate
           Context.footerCache.invalidate(file.getPath());
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Meta-Info for : " + file.getPath() + " changed. CachedModificationTime: "
+            LOG.debug("Meta-Info for : " + file.getPath() +
+                " changed. CachedModificationTime: "
               + fileInfo.modificationTime + ", CurrentModificationTime: "
               + file.getModificationTime()
-              + ", CachedLength: " + fileInfo.size + ", CurrentLength: " + file.getLen());
+              + ", CachedLength: " + fileInfo.size + ", CurrentLength: " +
+                file.getLen());
           }
         }
       } else {
@@ -588,20 +601,28 @@ public class OrcInputFormat  implements 
     private final long blockSize;
     private final BlockLocation[] locations;
     private final FileInfo fileInfo;
-    private Iterable<StripeInformation> stripes;
-    private FileMetaInfo fileMetaInfo;
+    private List<StripeInformation> stripes;
+    private ReaderImpl.FileMetaInfo fileMetaInfo;
     private Metadata metadata;
     private List<OrcProto.Type> types;
-
+    private final boolean isOriginal;
+    private final List<Long> deltas;
+    private final boolean hasBase;
 
     SplitGenerator(Context context, FileSystem fs,
-                   FileStatus file, FileInfo fileInfo) throws IOException {
+                   FileStatus file, FileInfo fileInfo,
+                   boolean isOriginal,
+                   List<Long> deltas,
+                   boolean hasBase) throws IOException {
       this.context = context;
       this.fs = fs;
       this.file = file;
       this.blockSize = file.getBlockSize();
       this.fileInfo = fileInfo;
-      locations = context.shims.getLocations(fs, file);
+      locations = SHIMS.getLocations(fs, file);
+      this.isOriginal = isOriginal;
+      this.deltas = deltas;
+      this.hasBase = hasBase;
     }
 
     Path getPath() {
@@ -612,8 +633,8 @@ public class OrcInputFormat  implements 
       if(locations.length == 1 && file.getLen() < context.maxSize) {
         String[] hosts = locations[0].getHosts();
         synchronized (context.splits) {
-          context.splits.add(new Context.FileSplitInfo(file.getPath(), 0,
-              file.getLen(), hosts, fileMetaInfo));
+          context.splits.add(new OrcSplit(file.getPath(), 0, file.getLen(),
+                hosts, fileMetaInfo, isOriginal, hasBase, deltas));
         }
       } else {
         // if it requires a compute task
@@ -655,7 +676,8 @@ public class OrcInputFormat  implements 
      * @param fileMetaInfo file metadata from footer and postscript
      * @throws IOException
      */
-    void createSplit(long offset, long length, FileMetaInfo fileMetaInfo) throws IOException {
+    void createSplit(long offset, long length,
+                     ReaderImpl.FileMetaInfo fileMetaInfo) throws IOException {
       String[] hosts;
       if ((offset % blockSize) + length <= blockSize) {
         // handle the single block case
@@ -699,8 +721,8 @@ public class OrcInputFormat  implements 
         hostList.toArray(hosts);
       }
       synchronized (context.splits) {
-        context.splits.add(new Context.FileSplitInfo(file.getPath(), offset,
-            length, hosts, fileMetaInfo));
+        context.splits.add(new OrcSplit(file.getPath(), offset, length,
+            hosts, fileMetaInfo, isOriginal, hasBase, deltas));
       }
     }
 
@@ -712,30 +734,43 @@ public class OrcInputFormat  implements 
     public void run() {
       try {
         populateAndCacheStripeDetails();
-        Configuration conf = context.conf;
-        SearchArgument sarg = createSarg(types, conf);
-        List<StripeStatistics> stripeStats = null;
-        int[] filterColumns = null;
-        if (sarg != null) {
-          List<PredicateLeaf> sargLeaves = null;
-          String[] allColumns = conf.get(serdeConstants.LIST_COLUMNS).split(",");
-          String[] neededColumns = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR).split(",");
-          sargLeaves = sarg.getLeaves();
-          filterColumns = new int[sargLeaves.size()];
-          for (int i = 0; i < filterColumns.length; ++i) {
-            String colName = sargLeaves.get(i).getColumnName();
-
-            // if needed columns does not contain the column specified in filter expression then
-            // it must be partition column. There will not be columns within ORC file for partitioned
-            // column, so we can ignore them
-            if (containsColumn(neededColumns, colName)) {
-              filterColumns[i] = RecordReaderImpl.findColumns(allColumns, colName);
-            } else {
-              filterColumns[i] = -1;
+
+        // figure out which stripes we need to read
+        boolean[] includeStripe = null;
+        // we can't eliminate stripes if there are deltas because the
+        // deltas may change the rows making them match the predicate.
+        if (deltas.isEmpty()) {
+          Reader.Options options = new Reader.Options();
+          setIncludedColumns(options, types, context.conf, isOriginal);
+          setSearchArgument(options, types, context.conf, isOriginal);
+          if (options.getSearchArgument() != null) {
+            SearchArgument sarg = options.getSearchArgument();
+            List<PredicateLeaf> sargLeaves = sarg.getLeaves();
+            List<StripeStatistics> stripeStats = metadata.getStripeStatistics();
+            int[] filterColumns = RecordReaderImpl.mapSargColumns(sargLeaves,
+                options.getColumnNames(), getRootColumn(isOriginal));
+
+            if (stripeStats != null) {
+              // eliminate stripes that doesn't satisfy the predicate condition
+              includeStripe = new boolean[stripes.size()];
+              for(int i=0; i < stripes.size(); ++i) {
+                includeStripe[i] = (i > stripeStats.size()) ||
+                    isStripeSatisfyPredicate(stripeStats.get(i), sarg,
+                                             filterColumns);
+                if (LOG.isDebugEnabled() && !includeStripe[i]) {
+                  LOG.debug("Eliminating ORC stripe-" + i + " of file '" +
+                            file.getPath() + "'  as it did not satisfy " +
+                            "predicate condition.");
+                }
+              }
             }
           }
+        }
 
-          stripeStats = metadata.getStripeStatistics();
+        // if we didn't have predicate pushdown, read everything
+        if (includeStripe == null) {
+          includeStripe = new boolean[stripes.size()];
+          Arrays.fill(includeStripe, true);
         }
 
         long currentOffset = -1;
@@ -744,18 +779,7 @@ public class OrcInputFormat  implements 
         for(StripeInformation stripe: stripes) {
           idx++;
 
-          // eliminate stripes that doesn't satisfy the predicate condition
-          if (sarg != null &&
-              stripeStats != null &&
-              idx < stripeStats.size() &&
-              !isStripeSatisfyPredicate(stripeStats.get(idx), sarg, filterColumns)) {
-
-            // if a stripe doesn't satisfy predicate condition then skip it
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Eliminating ORC stripe-" + idx + " of file '" + file.getPath()
-                  + "'  as it did not satisfy predicate condition.");
-            }
-
+          if (!includeStripe[idx]) {
             // create split for the previous unfinished stripe
             if (currentOffset != -1) {
               createSplit(currentOffset, currentLength, fileMetaInfo);
@@ -776,7 +800,8 @@ public class OrcInputFormat  implements 
             currentOffset = stripe.getOffset();
             currentLength = stripe.getLength();
           } else {
-            currentLength = (stripe.getOffset() + stripe.getLength()) - currentOffset;
+            currentLength =
+                (stripe.getOffset() + stripe.getLength()) - currentOffset;
           }
           if (currentLength >= context.maxSize) {
             createSplit(currentOffset, currentLength, fileMetaInfo);
@@ -804,32 +829,32 @@ public class OrcInputFormat  implements 
     private void populateAndCacheStripeDetails() {
       try {
         Reader orcReader;
-        boolean found = false;
         if (fileInfo != null) {
-          found = true;
           stripes = fileInfo.stripeInfos;
           fileMetaInfo = fileInfo.fileMetaInfo;
           metadata = fileInfo.metadata;
           types = fileInfo.types;
           // For multiple runs, in case sendSplitsInFooter changes
           if (fileMetaInfo == null && context.footerInSplits) {
-            orcReader = OrcFile.createReader(fs, file.getPath(), context.conf);
-            fileInfo.fileMetaInfo = orcReader.getFileMetaInfo();
+            orcReader = OrcFile.createReader(file.getPath(),
+                OrcFile.readerOptions(context.conf).filesystem(fs));
+            fileInfo.fileMetaInfo = ((ReaderImpl) orcReader).getFileMetaInfo();
             fileInfo.metadata = orcReader.getMetadata();
             fileInfo.types = orcReader.getTypes();
           }
-        }
-        if (!found) {
-          orcReader = OrcFile.createReader(fs, file.getPath(), context.conf);
+        } else {
+          orcReader = OrcFile.createReader(file.getPath(),
+              OrcFile.readerOptions(context.conf).filesystem(fs));
           stripes = orcReader.getStripes();
           metadata = orcReader.getMetadata();
           types = orcReader.getTypes();
-          fileMetaInfo = context.footerInSplits ? orcReader.getFileMetaInfo() : null;
+          fileMetaInfo = context.footerInSplits ?
+              ((ReaderImpl) orcReader).getFileMetaInfo() : null;
           if (context.cacheStripeDetails) {
             // Populate into cache.
             Context.footerCache.put(file.getPath(),
-                new FileInfo(file.getModificationTime(), file.getLen(), stripes, metadata, 
-                             types, fileMetaInfo));
+                new FileInfo(file.getModificationTime(), file.getLen(), stripes,
+                    metadata, types, fileMetaInfo));
           }
         }
       } catch (Throwable th) {
@@ -845,45 +870,35 @@ public class OrcInputFormat  implements 
       }
     }
 
-    private boolean containsColumn(String[] neededColumns, String colName) {
-      for (String col : neededColumns) {
-        if (colName.equalsIgnoreCase(col)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
     private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics,
-        SearchArgument sarg, int[] filterColumns) {
-      if (sarg != null && filterColumns != null) {
-        List<PredicateLeaf> predLeaves = sarg.getLeaves();
-        TruthValue[] truthValues = new TruthValue[predLeaves.size()];
-        for (int pred = 0; pred < truthValues.length; pred++) {
-          if (filterColumns[pred] != -1) {
-
-            // column statistics at index 0 contains only the number of rows
-            ColumnStatistics stats = stripeStatistics.getColumnStatistics()[filterColumns[pred] + 1];
-            Object minValue = RecordReaderImpl.getMin(stats);
-            Object maxValue = RecordReaderImpl.getMax(stats);
-            PredicateLeaf predLeaf = predLeaves.get(pred);
-            truthValues[pred] = RecordReaderImpl.evaluatePredicateRange(predLeaf, minValue, maxValue);
-          } else {
+                                             SearchArgument sarg,
+                                             int[] filterColumns) {
+      List<PredicateLeaf> predLeaves = sarg.getLeaves();
+      TruthValue[] truthValues = new TruthValue[predLeaves.size()];
+      for (int pred = 0; pred < truthValues.length; pred++) {
+        if (filterColumns[pred] != -1) {
+
+          // column statistics at index 0 contains only the number of rows
+          ColumnStatistics stats =
+              stripeStatistics.getColumnStatistics()[filterColumns[pred]];
+          Object minValue = RecordReaderImpl.getMin(stats);
+          Object maxValue = RecordReaderImpl.getMax(stats);
+          truthValues[pred] =
+              RecordReaderImpl.evaluatePredicateRange(predLeaves.get(pred),
+                  minValue, maxValue);
+        } else {
 
-            // parition column case.
-            // partition filter will be evaluated by partition pruner so
-            // we will not evaluate partition filter here.
-            truthValues[pred] = TruthValue.YES_NO_NULL;
-          }
+          // parition column case.
+          // partition filter will be evaluated by partition pruner so
+          // we will not evaluate partition filter here.
+          truthValues[pred] = TruthValue.YES_NO_NULL;
         }
-        return sarg.evaluate(truthValues).isNeeded();
       }
-      return true;
+      return sarg.evaluate(truthValues).isNeeded();
     }
-
   }
 
-  static List<Context.FileSplitInfo> generateSplitsInfo(Configuration conf)
+  static List<OrcSplit> generateSplitsInfo(Configuration conf)
       throws IOException {
 	  // use threads to resolve directories into splits
 	  Context context = new Context(conf);
@@ -911,20 +926,14 @@ public class OrcInputFormat  implements 
     }
 	  return context.splits;
   }
+
   @Override
   public InputSplit[] getSplits(JobConf job,
                                 int numSplits) throws IOException {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
-    List<OrcInputFormat.Context.FileSplitInfo> splits =
-        OrcInputFormat.generateSplitsInfo(job);
-    InputSplit[] result = new InputSplit[splits.size()];
-    for (int i=0;i<splits.size();i++) {
-      OrcInputFormat.Context.FileSplitInfo split = splits.get(i);
-      result[i] = new OrcSplit(split.getPath(), split.getStart(),
-          split.getLength(), split.getLocations(), split.getFileMetaInfo());
-    }
+    List<OrcSplit> result = generateSplitsInfo(job);
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
-    return result;
+    return result.toArray(new InputSplit[result.size()]);
   }
 
   /**
@@ -936,14 +945,16 @@ public class OrcInputFormat  implements 
   private static class FileInfo {
     long modificationTime;
     long size;
-    Iterable<StripeInformation> stripeInfos;
-    FileMetaInfo fileMetaInfo;
+    List<StripeInformation> stripeInfos;
+    ReaderImpl.FileMetaInfo fileMetaInfo;
     Metadata metadata;
     List<OrcProto.Type> types;
 
 
-    FileInfo(long modificationTime, long size, Iterable<StripeInformation> stripeInfos, 
-        Metadata metadata, List<OrcProto.Type> types, FileMetaInfo fileMetaInfo) {
+    FileInfo(long modificationTime, long size,
+             List<StripeInformation> stripeInfos,
+             Metadata metadata, List<OrcProto.Type> types,
+             ReaderImpl.FileMetaInfo fileMetaInfo) {
       this.modificationTime = modificationTime;
       this.size = size;
       this.stripeInfos = stripeInfos;
@@ -952,4 +963,219 @@ public class OrcInputFormat  implements 
       this.types = types;
     }
   }
+
+  @SuppressWarnings("unchecked")
+  private org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
+    createVectorizedReader(InputSplit split, JobConf conf, Reporter reporter
+                           ) throws IOException {
+    return (org.apache.hadoop.mapred.RecordReader)
+      new VectorizedOrcInputFormat().getRecordReader(split, conf, reporter);
+  }
+
+  @Override
+  public org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
+  getRecordReader(InputSplit inputSplit, JobConf conf,
+                  Reporter reporter) throws IOException {
+    boolean vectorMode = Utilities.isVectorMode(conf);
+
+    // if HiveCombineInputFormat gives us FileSplits instead of OrcSplits,
+    // we know it is not ACID.
+    if (inputSplit.getClass() == FileSplit.class) {
+      if (vectorMode) {
+        return createVectorizedReader(inputSplit, conf, reporter);
+      }
+      return new OrcRecordReader(OrcFile.createReader(
+          ((FileSplit) inputSplit).getPath(),
+          OrcFile.readerOptions(conf)), conf, (FileSplit) inputSplit);
+    }
+
+    OrcSplit split = (OrcSplit) inputSplit;
+    // TODO vectorized reader doesn't work with the new format yet
+    if (vectorMode) {
+      if (!split.getDeltas().isEmpty() || !split.isOriginal()) {
+        throw new IOException("Vectorization and ACID tables are incompatible."
+                              );
+      }
+      return createVectorizedReader(inputSplit, conf, reporter);
+    }
+    reporter.setStatus(inputSplit.toString());
+
+    // if we are strictly old-school, just use the old code
+    if (split.isOriginal() && split.getDeltas().isEmpty()) {
+      return new OrcRecordReader(OrcFile.createReader(split.getPath(),
+          OrcFile.readerOptions(conf)), conf, split);
+    }
+
+    Options options = new Options(conf).reporter(reporter);
+    final RowReader<OrcStruct> inner = getReader(inputSplit, options);
+    final RecordIdentifier id = inner.createKey();
+
+    // Return a RecordReader that is compatible with the Hive 0.12 reader
+    // with NullWritable for the key instead of RecordIdentifier.
+    return new org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>(){
+      @Override
+      public boolean next(NullWritable nullWritable,
+                          OrcStruct orcStruct) throws IOException {
+        return inner.next(id, orcStruct);
+      }
+
+      @Override
+      public NullWritable createKey() {
+        return NullWritable.get();
+      }
+
+      @Override
+      public OrcStruct createValue() {
+        return inner.createValue();
+      }
+
+      @Override
+      public long getPos() throws IOException {
+        return inner.getPos();
+      }
+
+      @Override
+      public void close() throws IOException {
+        inner.close();
+      }
+
+      @Override
+      public float getProgress() throws IOException {
+        return inner.getProgress();
+      }
+    };
+  }
+
+
+  @Override
+  public RowReader<OrcStruct> getReader(InputSplit inputSplit,
+                                        Options options) throws IOException {
+    final OrcSplit split = (OrcSplit) inputSplit;
+    final Path path = split.getPath();
+    Path root;
+    if (split.hasBase()) {
+      if (split.isOriginal()) {
+        root = path.getParent();
+      } else {
+        root = path.getParent().getParent();
+      }
+    } else {
+      root = path;
+    }
+    final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas());
+    final Configuration conf = options.getConfiguration();
+    final Reader reader;
+    final int bucket;
+    Reader.Options readOptions = new Reader.Options();
+    readOptions.range(split.getStart(), split.getLength());
+    if (split.hasBase()) {
+      bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf)
+          .getBucket();
+      reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+      final List<OrcProto.Type> types = reader.getTypes();
+      setIncludedColumns(readOptions, types, conf, split.isOriginal());
+      setSearchArgument(readOptions, types, conf, split.isOriginal());
+    } else {
+      bucket = (int) split.getStart();
+      reader = null;
+    }
+    String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
+                                Long.MAX_VALUE + ":");
+    ValidTxnList validTxnList = new ValidTxnListImpl(txnString);
+    final OrcRawRecordMerger records =
+        new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket,
+            validTxnList, readOptions, deltas);
+    return new RowReader<OrcStruct>() {
+      OrcStruct innerRecord = records.createValue();
+
+      @Override
+      public ObjectInspector getObjectInspector() {
+        return ((StructObjectInspector) reader.getObjectInspector())
+            .getAllStructFieldRefs().get(OrcRecordUpdater.ROW)
+            .getFieldObjectInspector();
+      }
+
+      @Override
+      public boolean next(RecordIdentifier recordIdentifier,
+                          OrcStruct orcStruct) throws IOException {
+        boolean result;
+        // filter out the deleted records
+        do {
+          result = records.next(recordIdentifier, innerRecord);
+        } while (result &&
+            OrcRecordUpdater.getOperation(innerRecord) ==
+                OrcRecordUpdater.DELETE_OPERATION);
+        if (result) {
+          // swap the fields with the passed in orcStruct
+          orcStruct.linkFields(OrcRecordUpdater.getRow(innerRecord));
+        }
+        return result;
+      }
+
+      @Override
+      public RecordIdentifier createKey() {
+        return records.createKey();
+      }
+
+      @Override
+      public OrcStruct createValue() {
+        return new OrcStruct(records.getColumns());
+      }
+
+      @Override
+      public long getPos() throws IOException {
+        return records.getPos();
+      }
+
+      @Override
+      public void close() throws IOException {
+        records.close();
+      }
+
+      @Override
+      public float getProgress() throws IOException {
+        return records.getProgress();
+      }
+    };
+  }
+
+  static Path findOriginalBucket(FileSystem fs,
+                                 Path directory,
+                                 int bucket) throws IOException {
+    for(FileStatus stat: fs.listStatus(directory)) {
+      String name = stat.getPath().getName();
+      if (Integer.parseInt(name.substring(0, name.indexOf('_'))) == bucket) {
+        return stat.getPath();
+      }
+    }
+    throw new IllegalArgumentException("Can't find bucket " + bucket + " in " +
+        directory);
+  }
+
+  @Override
+  public RawReader<OrcStruct> getRawReader(Configuration conf,
+                                           boolean collapseEvents,
+                                           int bucket,
+                                           ValidTxnList validTxnList,
+                                           Path baseDirectory,
+                                           Path[] deltaDirectory
+                                           ) throws IOException {
+    Reader reader = null;
+    boolean isOriginal = false;
+    if (baseDirectory != null) {
+      Path bucketFile;
+      if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
+        bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket);
+      } else {
+        isOriginal = true;
+        bucketFile = findOriginalBucket(baseDirectory.getFileSystem(conf),
+            baseDirectory, bucket);
+      }
+      reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
+    }
+    return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal,
+        bucket, validTxnList, new Reader.Options(), deltaDirectory);
+  }
+
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java Wed Mar 26 18:14:37 2014
@@ -48,8 +48,8 @@ public class OrcNewInputFormat extends I
     Path path = fileSplit.getPath();
     Configuration conf = ShimLoader.getHadoopShims()
         .getConfiguration(context);
-    FileSystem fs = path.getFileSystem(conf);
-    return new OrcRecordReader(OrcFile.createReader(fs, path, conf),
+    return new OrcRecordReader(OrcFile.createReader(path,
+                                                   OrcFile.readerOptions(conf)),
         ShimLoader.getHadoopShims().getConfiguration(context),
         fileSplit.getStart(), fileSplit.getLength());
   }
@@ -118,15 +118,14 @@ public class OrcNewInputFormat extends I
   public List<InputSplit> getSplits(JobContext jobContext)
       throws IOException, InterruptedException {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
-    List<OrcInputFormat.Context.FileSplitInfo> splits =
+    Configuration conf =
+        ShimLoader.getHadoopShims().getConfiguration(jobContext);
+    List<OrcSplit> splits =
         OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims()
         .getConfiguration(jobContext));
     List<InputSplit> result = new ArrayList<InputSplit>();
-    for (OrcInputFormat.Context.FileSplitInfo split : splits) {
-      FileSplit newSplit = new OrcNewSplit(split.getPath(),
-          split.getStart(), split.getLength(), split.getLocations(),
-          split.getFileMetaInfo());
-      result.add(newSplit);
+    for(OrcSplit split: OrcInputFormat.generateSplitsInfo(conf)) {
+      result.add(new OrcNewSplit(split));
     }
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
     return result;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java Wed Mar 26 18:14:37 2014
@@ -21,9 +21,9 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -33,31 +33,42 @@ import org.apache.hadoop.mapreduce.lib.i
  *
  */
 public class OrcNewSplit extends FileSplit {
-  private Reader.FileMetaInfo fileMetaInfo;
+  private ReaderImpl.FileMetaInfo fileMetaInfo;
   private boolean hasFooter;
-  
+  private boolean isOriginal;
+  private boolean hasBase;
+  private final List<Long> deltas = new ArrayList<Long>();
+
   protected OrcNewSplit(){
     //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it.
     //This constructor is used to create the object and then call readFields()
     // so just pass nulls to this super constructor.
-    super(null, 0, 0, (String[])null);
+    super(null, 0, 0, null);
   }
   
-  public OrcNewSplit(Path path, long offset, long length, String[] hosts,
-      FileMetaInfo fileMetaInfo) {
-    super(path, offset, length, hosts);
-    this.fileMetaInfo = fileMetaInfo;
-    hasFooter = this.fileMetaInfo != null;
+  public OrcNewSplit(OrcSplit inner) throws IOException {
+    super(inner.getPath(), inner.getStart(), inner.getLength(),
+          inner.getLocations());
+    this.fileMetaInfo = inner.getFileMetaInfo();
+    this.hasFooter = inner.hasFooter();
+    this.isOriginal = inner.isOriginal();
+    this.hasBase = inner.hasBase();
+    this.deltas.addAll(inner.getDeltas());
   }
-  
+
   @Override
   public void write(DataOutput out) throws IOException {
     //serialize path, offset, length using FileSplit
     super.write(out);
 
-    // Whether footer information follows.
-    out.writeBoolean(hasFooter);
-
+    int flags = (hasBase ? OrcSplit.BASE_FLAG : 0) |
+        (isOriginal ? OrcSplit.ORIGINAL_FLAG : 0) |
+        (hasFooter ? OrcSplit.FOOTER_FLAG : 0);
+    out.writeByte(flags);
+    out.writeInt(deltas.size());
+    for(Long delta: deltas) {
+      out.writeLong(delta);
+    }
     if (hasFooter) {
       // serialize FileMetaInfo fields
       Text.writeString(out, fileMetaInfo.compressionType);
@@ -74,14 +85,22 @@ public class OrcNewSplit extends FileSpl
           footerBuff.limit() - footerBuff.position());
     }
   }
-  
+
   @Override
   public void readFields(DataInput in) throws IOException {
     //deserialize path, offset, length using FileSplit
     super.readFields(in);
 
-    hasFooter = in.readBoolean();
-
+    byte flags = in.readByte();
+    hasFooter = (OrcSplit.FOOTER_FLAG & flags) != 0;
+    isOriginal = (OrcSplit.ORIGINAL_FLAG & flags) != 0;
+    hasBase = (OrcSplit.BASE_FLAG & flags) != 0;
+
+    deltas.clear();
+    int numDeltas = in.readInt();
+    for(int i=0; i < numDeltas; i++) {
+      deltas.add(in.readLong());
+    }
     if (hasFooter) {
       // deserialize FileMetaInfo fields
       String compressionType = Text.readString(in);
@@ -93,15 +112,28 @@ public class OrcNewSplit extends FileSpl
       ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
       in.readFully(footerBuff.array(), 0, footerBuffSize);
 
-      fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, metadataSize, footerBuff);
+      fileMetaInfo = new ReaderImpl.FileMetaInfo(compressionType, bufferSize,
+          metadataSize, footerBuff);
     }
   }
 
-  public FileMetaInfo getFileMetaInfo(){
+  ReaderImpl.FileMetaInfo getFileMetaInfo(){
     return fileMetaInfo;
   }
 
   public boolean hasFooter() {
     return hasFooter;
   }
+
+  public boolean isOriginal() {
+    return isOriginal;
+  }
+
+  public boolean hasBase() {
+    return hasBase;
+  }
+
+  public List<Long> getDeltas() {
+    return deltas;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Wed Mar 26 18:14:37 2014
@@ -19,12 +19,19 @@ package org.apache.hadoop.hive.ql.io.orc
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.FSRecordWriter;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileOutputFormat;
@@ -34,6 +41,7 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.Progressable;
 
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Properties;
 
@@ -41,7 +49,7 @@ import java.util.Properties;
  * A Hive OutputFormat for ORC files.
  */
 public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
-                        implements HiveOutputFormat<NullWritable, OrcSerdeRow> {
+                        implements AcidOutputFormat<OrcSerdeRow> {
 
   private static class OrcRecordWriter
       implements RecordWriter<NullWritable, OrcSerdeRow>,
@@ -179,4 +187,135 @@ public class OrcOutputFormat extends Fil
                          Progressable reporter) throws IOException {
     return new OrcRecordWriter(path, getOptions(conf,tableProperties));
   }
+
+  private class DummyOrcRecordUpdater implements RecordUpdater {
+    private final Path path;
+    private final ObjectInspector inspector;
+    private final PrintStream out;
+
+    private DummyOrcRecordUpdater(Path path, Options options) {
+      this.path = path;
+      this.inspector = options.getInspector();
+      this.out = options.getDummyStream();
+    }
+
+    @Override
+    public void insert(long currentTransaction, Object row) throws IOException {
+      out.println("insert " + path + " currTxn: " + currentTransaction +
+          " obj: " + stringifyObject(row, inspector));
+    }
+
+    @Override
+    public void update(long currentTransaction, long originalTransaction,
+                       long rowId, Object row) throws IOException {
+      out.println("update " + path + " currTxn: " + currentTransaction +
+          " origTxn: " + originalTransaction + " row: " + rowId + " obj: " +
+          stringifyObject(row, inspector));
+    }
+
+    @Override
+    public void delete(long currentTransaction, long originalTransaction,
+                       long rowId) throws IOException {
+      out.println("delete " + path + " currTxn: " + currentTransaction +
+         " origTxn: " + originalTransaction + " row: " + rowId);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      out.println("flush " + path);
+    }
+
+    @Override
+    public void close(boolean abort) throws IOException {
+      out.println("close " + path);
+    }
+
+    @Override
+    public SerDeStats getStats() {
+      return null;
+    }
+
+    private void stringifyObject(StringBuilder buffer,
+                                 Object obj,
+                                 ObjectInspector inspector
+                                ) throws IOException {
+      if (inspector instanceof StructObjectInspector) {
+        buffer.append("{ ");
+        StructObjectInspector soi = (StructObjectInspector) inspector;
+        boolean isFirst = true;
+        for(StructField field: soi.getAllStructFieldRefs()) {
+          if (isFirst) {
+            isFirst = false;
+          } else {
+            buffer.append(", ");
+          }
+          buffer.append(field.getFieldName());
+          buffer.append(": ");
+          stringifyObject(buffer, soi.getStructFieldData(obj, field),
+              field.getFieldObjectInspector());
+        }
+        buffer.append(" }");
+      } else if (inspector instanceof PrimitiveObjectInspector) {
+        PrimitiveObjectInspector poi = (PrimitiveObjectInspector) inspector;
+        buffer.append(poi.getPrimitiveJavaObject(obj).toString());
+      } else {
+        buffer.append("*unknown*");
+      }
+    }
+
+    private String stringifyObject(Object obj,
+                                   ObjectInspector inspector
+                                  ) throws IOException {
+      StringBuilder buffer = new StringBuilder();
+      stringifyObject(buffer, obj, inspector);
+      return buffer.toString();
+    }
+  }
+
+  @Override
+  public RecordUpdater getRecordUpdater(Path path,
+                                        Options options) throws IOException {
+    if (options.getDummyStream() != null) {
+      return new DummyOrcRecordUpdater(path, options);
+    } else {
+      return new OrcRecordUpdater(path, options);
+    }
+  }
+
+  @Override
+  public FSRecordWriter getRawRecordWriter(Path path,
+                                           Options options) throws IOException {
+    final Path filename = AcidUtils.createFilename(path, options);
+    final OrcFile.WriterOptions opts =
+        OrcFile.writerOptions(options.getConfiguration());
+    if (!options.isWritingBase()) {
+      opts.bufferSize(OrcRecordUpdater.DELTA_BUFFER_SIZE)
+          .stripeSize(OrcRecordUpdater.DELTA_STRIPE_SIZE)
+          .blockPadding(false)
+          .compress(CompressionKind.NONE)
+          .rowIndexStride(0);
+    }
+    final OrcRecordUpdater.KeyIndexBuilder watcher =
+        new OrcRecordUpdater.KeyIndexBuilder();
+    opts.inspector(options.getInspector())
+        .callback(watcher);
+    final Writer writer = OrcFile.createWriter(filename, opts);
+    return new FSRecordWriter() {
+      @Override
+      public void write(Writable w) throws IOException {
+        OrcStruct orc = (OrcStruct) w;
+        watcher.addKey(
+            ((LongWritable)
+                orc.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)).get(),
+            ((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(),
+            ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ROW_ID)).get());
+        writer.addRow(w);
+      }
+
+      @Override
+      public void close(boolean abort) throws IOException {
+        writer.close();
+      }
+    };
+  }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java?rev=1581977&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java Wed Mar 26 18:14:37 2014
@@ -0,0 +1,661 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Merges a base and a list of delta files together into a single stream of
+ * events.
+ */
+public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
+
+  private static final Log LOG = LogFactory.getLog(OrcRawRecordMerger.class);
+
+  private final Configuration conf;
+  private final boolean collapse;
+  private final RecordReader baseReader;
+  private final long offset;
+  private final long length;
+  private final ValidTxnList validTxnList;
+  private final int columns;
+  private ReaderKey prevKey = new ReaderKey();
+  // this is the key less than the lowest key we need to process
+  private RecordIdentifier minKey;
+  // this is the last key we need to process
+  private RecordIdentifier maxKey;
+  // an extra value so that we can return it while reading ahead
+  private OrcStruct extraValue;
+
+  /**
+   * A RecordIdentifier extended with the current transaction id. This is the
+   * key of our merge sort with the originalTransaction, bucket, and rowId
+   * ascending and the currentTransaction descending. This means that if the
+   * reader is collapsing events to just the last update, just the first
+   * instance of each record is required.
+   */
+  final static class ReaderKey extends RecordIdentifier{
+    private long currentTransactionId;
+
+    public ReaderKey() {
+      this(-1, -1, -1, -1);
+    }
+
+    public ReaderKey(long originalTransaction, int bucket, long rowId,
+                     long currentTransactionId) {
+      super(originalTransaction, bucket, rowId);
+      this.currentTransactionId = currentTransactionId;
+    }
+
+    @Override
+    public void set(RecordIdentifier other) {
+      super.set(other);
+      currentTransactionId = ((ReaderKey) other).currentTransactionId;
+    }
+
+    public void setValues(long originalTransactionId,
+                          int bucket,
+                          long rowId,
+                          long currentTransactionId) {
+      setValues(originalTransactionId, bucket, rowId);
+      this.currentTransactionId = currentTransactionId;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return super.equals(other) &&
+          currentTransactionId == ((ReaderKey) other).currentTransactionId;
+    }
+
+    @Override
+    public int compareTo(RecordIdentifier other) {
+      int sup = compareToInternal(other);
+      if (sup == 0) {
+        if (other.getClass() == ReaderKey.class) {
+          ReaderKey oth = (ReaderKey) other;
+          if (currentTransactionId != oth.currentTransactionId) {
+            return currentTransactionId < oth.currentTransactionId ? +1 : -1;
+          }
+        } else {
+          return -1;
+        }
+      }
+      return sup;
+    }
+
+    public long getCurrentTransactionId() {
+      return currentTransactionId;
+    }
+
+    /**
+     * Compare rows without considering the currentTransactionId.
+     * @param other the value to compare to
+     * @return -1, 0, +1
+     */
+    public int compareRow(RecordIdentifier other) {
+      return compareToInternal(other);
+    }
+
+    @Override
+    public String toString() {
+      return "{originalTxn: " + getTransactionId() + ", bucket: " +
+          getBucketId() + ", row: " + getRowId() + ", currentTxn: " +
+          currentTransactionId + "}";
+    }
+  }
+
+  /**
+   * A reader and the next record from that reader. The code reads ahead so that
+   * we can return the lowest ReaderKey from each of the readers. Thus, the
+   * next available row is nextRecord and only following records are still in
+   * the reader.
+   */
+  static class ReaderPair {
+    OrcStruct nextRecord;
+    final Reader reader;
+    final RecordReader recordReader;
+    final ReaderKey key;
+    final RecordIdentifier maxKey;
+    final int bucket;
+
+    /**
+     * Create a reader that reads from the first key larger than minKey to any
+     * keys equal to maxKey.
+     * @param key the key to read into
+     * @param reader the ORC file reader
+     * @param bucket the bucket number for the file
+     * @param minKey only return keys larger than minKey if it is non-null
+     * @param maxKey only return keys less than or equal to maxKey if it is
+     *               non-null
+     * @param options options to provide to read the rows.
+     * @throws IOException
+     */
+    ReaderPair(ReaderKey key, Reader reader, int bucket,
+               RecordIdentifier minKey, RecordIdentifier maxKey,
+               ReaderImpl.Options options) throws IOException {
+      this.reader = reader;
+      this.key = key;
+      this.maxKey = maxKey;
+      this.bucket = bucket;
+      // TODO use stripe statistics to jump over stripes
+      recordReader = reader.rowsOptions(options);
+      // advance the reader until we reach the minimum key
+      do {
+        next(nextRecord);
+      } while (nextRecord != null &&
+          (minKey != null && key.compareRow(minKey) <= 0));
+    }
+
+    void next(OrcStruct next) throws IOException {
+      if (recordReader.hasNext()) {
+        nextRecord = (OrcStruct) recordReader.next(next);
+        // set the key
+        key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord),
+            OrcRecordUpdater.getBucket(nextRecord),
+            OrcRecordUpdater.getRowId(nextRecord),
+            OrcRecordUpdater.getCurrentTransaction(nextRecord));
+
+        // if this record is larger than maxKey, we need to stop
+        if (maxKey != null && key.compareRow(maxKey) > 0) {
+          LOG.debug("key " + key + " > maxkey " + maxKey);
+          nextRecord = null;
+          recordReader.close();
+        }
+      } else {
+        nextRecord = null;
+        recordReader.close();
+      }
+    }
+
+    int getColumns() {
+      return reader.getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount();
+    }
+  }
+
+  /**
+   * A reader that pretends an original base file is a new version base file.
+   * It wraps the underlying reader's row with an ACID event object and
+   * makes the relevant translations.
+   */
+  static final class OriginalReaderPair extends ReaderPair {
+    OriginalReaderPair(ReaderKey key, Reader reader, int bucket,
+                       RecordIdentifier minKey, RecordIdentifier maxKey,
+                       Reader.Options options) throws IOException {
+      super(key, reader, bucket, minKey, maxKey, options);
+    }
+
+    @Override
+    void next(OrcStruct next) throws IOException {
+      if (recordReader.hasNext()) {
+        long nextRowId = recordReader.getRowNumber();
+        // have to do initialization here, because the super's constructor
+        // calls next and thus we need to initialize before our constructor
+        // runs
+        if (next == null) {
+          nextRecord = new OrcStruct(OrcRecordUpdater.FIELDS);
+          IntWritable operation =
+              new IntWritable(OrcRecordUpdater.INSERT_OPERATION);
+          nextRecord.setFieldValue(OrcRecordUpdater.OPERATION, operation);
+          nextRecord.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
+              new LongWritable(0));
+          nextRecord.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
+              new LongWritable(0));
+          nextRecord.setFieldValue(OrcRecordUpdater.BUCKET,
+              new IntWritable(bucket));
+          nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID,
+              new LongWritable(nextRowId));
+          nextRecord.setFieldValue(OrcRecordUpdater.ROW,
+              recordReader.next(null));
+        } else {
+          nextRecord = next;
+          ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION))
+              .set(OrcRecordUpdater.INSERT_OPERATION);
+          ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
+              .set(0);
+          ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
+              .set(bucket);
+          ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
+              .set(0);
+          ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
+              .set(0);
+          nextRecord.setFieldValue(OrcRecordUpdater.ROW,
+              recordReader.next(OrcRecordUpdater.getRow(next)));
+        }
+        key.setValues(0L, bucket, nextRowId, 0L);
+        if (maxKey != null && key.compareRow(maxKey) > 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("key " + key + " > maxkey " + maxKey);
+          }
+          nextRecord = null;
+          recordReader.close();
+        }
+      } else {
+        nextRecord = null;
+        recordReader.close();
+      }
+    }
+
+    @Override
+    int getColumns() {
+      return reader.getTypes().get(0).getSubtypesCount();
+    }
+  }
+
+  private final TreeMap<ReaderKey, ReaderPair> readers =
+      new TreeMap<ReaderKey, ReaderPair>();
+
+  // The reader that currently has the lowest key.
+  private ReaderPair primary;
+
+  // The key of the next lowest reader.
+  private ReaderKey secondaryKey = null;
+
+  /**
+   * Find the key range for original bucket files.
+   * @param reader the reader
+   * @param bucket the bucket number we are reading
+   * @param options the options for reading with
+   * @throws IOException
+   */
+  private void discoverOriginalKeyBounds(Reader reader, int bucket,
+                                         Reader.Options options
+                                         ) throws IOException {
+    long rowLength = 0;
+    long rowOffset = 0;
+    long offset = options.getOffset();
+    long maxOffset = options.getMaxOffset();
+    boolean isTail = true;
+    for(StripeInformation stripe: reader.getStripes()) {
+      if (offset > stripe.getOffset()) {
+        rowOffset += stripe.getNumberOfRows();
+      } else if (maxOffset > stripe.getOffset()) {
+        rowLength += stripe.getNumberOfRows();
+      } else {
+        isTail = false;
+        break;
+      }
+    }
+    if (rowOffset > 0) {
+      minKey = new RecordIdentifier(0, bucket, rowOffset - 1);
+    }
+    if (!isTail) {
+      maxKey = new RecordIdentifier(0, bucket, rowOffset + rowLength - 1);
+    }
+  }
+
+  /**
+   * Find the key range for bucket files.
+   * @param reader the reader
+   * @param options the options for reading with
+   * @throws IOException
+   */
+  private void discoverKeyBounds(Reader reader,
+                                 Reader.Options options) throws IOException {
+    RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader);
+    long offset = options.getOffset();
+    long maxOffset = options.getMaxOffset();
+    int firstStripe = 0;
+    int stripeCount = 0;
+    boolean isTail = true;
+    List<StripeInformation> stripes = reader.getStripes();
+    for(StripeInformation stripe: stripes) {
+      if (offset > stripe.getOffset()) {
+        firstStripe += 1;
+      } else if (maxOffset > stripe.getOffset()) {
+        stripeCount += 1;
+      } else {
+        isTail = false;
+        break;
+      }
+    }
+    if (firstStripe != 0) {
+      minKey = keyIndex[firstStripe - 1];
+    }
+    if (!isTail) {
+      maxKey = keyIndex[firstStripe + stripeCount - 1];
+    }
+  }
+
+  /**
+   * Convert from the row include/sarg/columnNames to the event equivalent
+   * for the underlying file.
+   * @param options options for the row reader
+   * @return a cloned options object that is modified for the event reader
+   */
+  static Reader.Options createEventOptions(Reader.Options options) {
+    Reader.Options result = options.clone();
+    result.range(options.getOffset(), Long.MAX_VALUE);
+    // slide the columns down by 6 for the include array
+    if (options.getInclude() != null) {
+      boolean[] orig = options.getInclude();
+      // we always need the base row
+      orig[0] = true;
+      boolean[] include = new boolean[orig.length + OrcRecordUpdater.FIELDS];
+      Arrays.fill(include, 0, OrcRecordUpdater.FIELDS, true);
+      for(int i= 0; i < orig.length; ++i) {
+        include[i + OrcRecordUpdater.FIELDS] = orig[i];
+      }
+      result.include(include);
+    }
+
+    // slide the column names down by 6 for the name array
+    if (options.getColumnNames() != null) {
+      String[] orig = options.getColumnNames();
+      String[] cols = new String[orig.length + OrcRecordUpdater.FIELDS];
+      for(int i=0; i < orig.length; ++i) {
+        cols[i + OrcRecordUpdater.FIELDS] = orig[i];
+      }
+      result.searchArgument(options.getSearchArgument(), cols);
+    }
+    return result;
+  }
+
+  /**
+   * Create a reader that merge sorts the ACID events together.
+   * @param conf the configuration
+   * @param collapseEvents should the events on the same row be collapsed
+   * @param isOriginal is the base file a pre-acid file
+   * @param bucket the bucket we are reading
+   * @param options the options to read with
+   * @param deltaDirectory the list of delta directories to include
+   * @throws IOException
+   */
+  OrcRawRecordMerger(Configuration conf,
+                     boolean collapseEvents,
+                     Reader reader,
+                     boolean isOriginal,
+                     int bucket,
+                     ValidTxnList validTxnList,
+                     Reader.Options options,
+                     Path[] deltaDirectory) throws IOException {
+    this.conf = conf;
+    this.collapse = collapseEvents;
+    this.offset = options.getOffset();
+    this.length = options.getLength();
+    this.validTxnList = validTxnList;
+    // modify the optins to reflect the event instead of the base row
+    Reader.Options eventOptions = createEventOptions(options);
+    if (reader == null) {
+      baseReader = null;
+    } else {
+
+      // find the min/max based on the offset and length
+      if (isOriginal) {
+        discoverOriginalKeyBounds(reader, bucket, options);
+      } else {
+        discoverKeyBounds(reader, options);
+      }
+      LOG.info("min key = " + minKey + ", max key = " + maxKey);
+      // use the min/max instead of the byte range
+      ReaderPair pair;
+      ReaderKey key = new ReaderKey();
+      if (isOriginal) {
+        options = options.clone();
+        options.range(options.getOffset(), Long.MAX_VALUE);
+        pair = new OriginalReaderPair(key, reader, bucket, minKey, maxKey,
+                                      options);
+      } else {
+        pair = new ReaderPair(key, reader, bucket, minKey, maxKey,
+                              eventOptions);
+      }
+
+      // if there is at least one record, put it in the map
+      if (pair.nextRecord != null) {
+        readers.put(key, pair);
+      }
+      baseReader = pair.recordReader;
+    }
+
+    // we always want to read all of the deltas
+    eventOptions.range(0, Long.MAX_VALUE);
+    if (deltaDirectory != null) {
+      for(Path delta: deltaDirectory) {
+        ReaderKey key = new ReaderKey();
+        Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
+        FileSystem fs = deltaFile.getFileSystem(conf);
+        long length = getLastFlushLength(fs, deltaFile);
+        if (fs.exists(deltaFile) && length != -1) {
+          Reader deltaReader = OrcFile.createReader(deltaFile,
+              OrcFile.readerOptions(conf).maxLength(length));
+          ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
+            maxKey, eventOptions);
+          if (deltaPair.nextRecord != null) {
+            readers.put(key, deltaPair);
+          }
+        }
+      }
+    }
+
+    // get the first record
+    Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry();
+    if (entry == null) {
+      columns = 0;
+      primary = null;
+    } else {
+      primary = entry.getValue();
+      if (readers.isEmpty()) {
+        secondaryKey = null;
+      } else {
+        secondaryKey = readers.firstKey();
+      }
+      // get the number of columns in the user's rows
+      columns = primary.getColumns();
+    }
+  }
+
+  /**
+   * Read the side file to get the last flush length.
+   * @param fs the file system to use
+   * @param deltaFile the path of the delta file
+   * @return the maximum size of the file to use
+   * @throws IOException
+   */
+  private static long getLastFlushLength(FileSystem fs,
+                                         Path deltaFile) throws IOException {
+    Path lengths = OrcRecordUpdater.getSideFile(deltaFile);
+    long result = Long.MAX_VALUE;
+    try {
+      FSDataInputStream stream = fs.open(lengths);
+      result = -1;
+      while (stream.available() > 0) {
+        result = stream.readLong();
+      }
+      stream.close();
+      return result;
+    } catch (IOException ioe) {
+      return result;
+    }
+  }
+
+  @VisibleForTesting
+  RecordIdentifier getMinKey() {
+    return minKey;
+  }
+
+  @VisibleForTesting
+  RecordIdentifier getMaxKey() {
+    return maxKey;
+  }
+
+  @VisibleForTesting
+  ReaderPair getCurrentReader() {
+    return primary;
+  }
+
+  @VisibleForTesting
+  Map<ReaderKey, ReaderPair> getOtherReaders() {
+    return readers;
+  }
+
+  @Override
+  public boolean next(RecordIdentifier recordIdentifier,
+                      OrcStruct prev) throws IOException {
+    boolean keysSame = true;
+    while (keysSame && primary != null) {
+
+      // The primary's nextRecord is the next value to return
+      OrcStruct current = primary.nextRecord;
+      recordIdentifier.set(primary.key);
+
+      // Advance the primary reader to the next record
+      primary.next(extraValue);
+
+      // Save the current record as the new extraValue for next time so that
+      // we minimize allocations
+      extraValue = current;
+
+      // now that the primary reader has advanced, we need to see if we
+      // continue to read it or move to the secondary.
+      if (primary.nextRecord == null ||
+          primary.key.compareTo(secondaryKey) > 0) {
+
+        // if the primary isn't done, push it back into the readers
+        if (primary.nextRecord != null) {
+          readers.put(primary.key, primary);
+        }
+
+        // update primary and secondaryKey
+        Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry();
+        if (entry != null) {
+          primary = entry.getValue();
+          if (readers.isEmpty()) {
+            secondaryKey = null;
+          } else {
+            secondaryKey = readers.firstKey();
+          }
+        } else {
+          primary = null;
+        }
+      }
+
+      // if this transaction isn't ok, skip over it
+      if (!validTxnList.isTxnCommitted(
+          ((ReaderKey) recordIdentifier).getCurrentTransactionId())) {
+        continue;
+      }
+
+      // if we are collapsing, figure out if this is a new row
+      if (collapse) {
+        keysSame = prevKey.compareRow(recordIdentifier) == 0;
+        if (!keysSame) {
+          prevKey.set(recordIdentifier);
+        }
+      } else {
+        keysSame = false;
+      }
+
+      // set the output record by fiddling with the pointers so that we can
+      // avoid a copy.
+      prev.linkFields(current);
+    }
+    return !keysSame;
+  }
+
+  @Override
+  public RecordIdentifier createKey() {
+    return new ReaderKey();
+  }
+
+  @Override
+  public OrcStruct createValue() {
+    return new OrcStruct(OrcRecordUpdater.FIELDS);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return offset + (long)(getProgress() * length);
+  }
+
+  @Override
+  public void close() throws IOException {
+    for(ReaderPair pair: readers.values()) {
+      pair.recordReader.close();
+    }
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return baseReader == null ? 1 : baseReader.getProgress();
+  }
+
+  @Override
+  public ObjectInspector getObjectInspector() {
+    // Read the configuration parameters
+    String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
+    // NOTE: if "columns.types" is missing, all columns will be of String type
+    String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+
+    // Parse the configuration parameters
+    ArrayList<String> columnNames = new ArrayList<String>();
+    if (columnNameProperty != null && columnNameProperty.length() > 0) {
+      Collections.addAll(columnNames, columnNameProperty.split(","));
+    }
+    if (columnTypeProperty == null) {
+      // Default type: all string
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < columnNames.size(); i++) {
+        if (i > 0) {
+          sb.append(":");
+        }
+        sb.append("string");
+      }
+      columnTypeProperty = sb.toString();
+    }
+
+    ArrayList<TypeInfo> fieldTypes =
+        TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+    StructTypeInfo rowType = new StructTypeInfo();
+    rowType.setAllStructFieldNames(columnNames);
+    rowType.setAllStructFieldTypeInfos(fieldTypes);
+    return OrcRecordUpdater.createEventSchema
+        (OrcStruct.createObjectInspector(rowType));
+  }
+
+  /**
+   * Get the number of columns in the underlying rows.
+   * @return 0 if there are no base and no deltas.
+   */
+  public int getColumns() {
+    return columns;
+  }
+}