You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/03/26 19:14:39 UTC

svn commit: r1581977 [5/5] - in /hive/trunk: ./ common/src/java/org/apache/hadoop/hive/common/ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/ metastore/src/java/org/apache/hadoop/hive/metastore/ metastore/src/java/org/apache/hadoop/hi...

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java?rev=1581977&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java Wed Mar 26 18:14:37 2014
@@ -0,0 +1,1111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair;
+import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey;
+import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+import org.mockito.MockSettings;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestOrcRawRecordMerger {
+
+  private static final Log LOG = LogFactory.getLog(TestOrcRawRecordMerger.class);
+
+  @Test
+  public void testOrdering() throws Exception {
+    ReaderKey left = new ReaderKey(100, 200, 1200, 300);
+    ReaderKey right = new ReaderKey();
+    right.setValues(100, 200, 1000, 200);
+    assertTrue(right.compareTo(left) < 0);
+    assertTrue(left.compareTo(right) > 0);
+    assertEquals(false, left.equals(right));
+    left.set(right);
+    assertTrue(right.compareTo(left) == 0);
+    assertEquals(true, right.equals(left));
+    right.setRowId(2000);
+    assertTrue(right.compareTo(left) > 0);
+    left.setValues(1, 2, 3, 4);
+    right.setValues(100, 2, 3, 4);
+    assertTrue(left.compareTo(right) < 0);
+    assertTrue(right.compareTo(left) > 0);
+    left.setValues(1, 2, 3, 4);
+    right.setValues(1, 100, 3, 4);
+    assertTrue(left.compareTo(right) < 0);
+    assertTrue(right.compareTo(left) > 0);
+    left.setValues(1, 2, 3, 100);
+    right.setValues(1, 2, 3, 4);
+    assertTrue(left.compareTo(right) < 0);
+    assertTrue(right.compareTo(left) > 0);
+
+    // ensure that we are consistent when comparing to the base class
+    RecordIdentifier ri = new RecordIdentifier(1, 2, 3);
+    assertEquals(1, ri.compareTo(left));
+    assertEquals(-1, left.compareTo(ri));
+    assertEquals(false, ri.equals(left));
+    assertEquals(false, left.equals(ri));
+  }
+
+  private static void setRow(OrcStruct event,
+                             int operation,
+                             long originalTransaction,
+                             int bucket,
+                             long rowId,
+                             long currentTransaction,
+                             String value) {
+    event.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation));
+    event.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
+        new LongWritable(originalTransaction));
+    event.setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucket));
+    event.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(rowId));
+    event.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
+        new LongWritable(currentTransaction));
+    OrcStruct row = new OrcStruct(1);
+    row.setFieldValue(0, new Text(value));
+    event.setFieldValue(OrcRecordUpdater.ROW, row);
+  }
+
+  private static String value(OrcStruct event) {
+    return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
+  }
+
+  private List<StripeInformation> createStripes(long... rowCounts) {
+    long offset = 0;
+    List<StripeInformation> result =
+        new ArrayList<StripeInformation>(rowCounts.length);
+    for(long count: rowCounts) {
+      OrcProto.StripeInformation.Builder stripe =
+          OrcProto.StripeInformation.newBuilder();
+      stripe.setDataLength(800).setIndexLength(100).setFooterLength(100)
+          .setNumberOfRows(count).setOffset(offset);
+      offset += 1000;
+      result.add(new ReaderImpl.StripeInformationImpl(stripe.build()));
+    }
+    return result;
+  }
+
+  // can add .verboseLogging() to cause Mockito to log invocations
+  private final MockSettings settings = Mockito.withSettings().verboseLogging();
+
+  private Reader createMockReader() throws IOException {
+    Reader reader = Mockito.mock(Reader.class, settings);
+    RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
+    OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first");
+    OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second");
+    OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third");
+    OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth");
+    OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth");
+    Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
+        .thenReturn(recordReader);
+
+    Mockito.when(recordReader.hasNext()).
+        thenReturn(true, true, true, true, true, false);
+
+    Mockito.when(recordReader.getProgress()).thenReturn(1.0f);
+
+    Mockito.when(recordReader.next(null)).thenReturn(row1);
+    Mockito.when(recordReader.next(row1)).thenReturn(row2);
+    Mockito.when(recordReader.next(row2)).thenReturn(row3);
+    Mockito.when(recordReader.next(row3)).thenReturn(row4);
+    Mockito.when(recordReader.next(row4)).thenReturn(row5);
+
+    return reader;
+  }
+
+  @Test
+  public void testReaderPair() throws Exception {
+    ReaderKey key = new ReaderKey();
+    Reader reader = createMockReader();
+    RecordIdentifier minKey = new RecordIdentifier(10, 20, 30);
+    RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
+    ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
+        new Reader.Options());
+    RecordReader recordReader = pair.recordReader;
+    assertEquals(10, key.getTransactionId());
+    assertEquals(20, key.getBucketId());
+    assertEquals(40, key.getRowId());
+    assertEquals(120, key.getCurrentTransactionId());
+    assertEquals("third", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(40, key.getTransactionId());
+    assertEquals(50, key.getBucketId());
+    assertEquals(60, key.getRowId());
+    assertEquals(130, key.getCurrentTransactionId());
+    assertEquals("fourth", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(null, pair.nextRecord);
+    Mockito.verify(recordReader).close();
+  }
+
+  @Test
+  public void testReaderPairNoMin() throws Exception {
+    ReaderKey key = new ReaderKey();
+    Reader reader = createMockReader();
+
+    ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
+        new Reader.Options());
+    RecordReader recordReader = pair.recordReader;
+    assertEquals(10, key.getTransactionId());
+    assertEquals(20, key.getBucketId());
+    assertEquals(20, key.getRowId());
+    assertEquals(100, key.getCurrentTransactionId());
+    assertEquals("first", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(10, key.getTransactionId());
+    assertEquals(20, key.getBucketId());
+    assertEquals(30, key.getRowId());
+    assertEquals(110, key.getCurrentTransactionId());
+    assertEquals("second", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(10, key.getTransactionId());
+    assertEquals(20, key.getBucketId());
+    assertEquals(40, key.getRowId());
+    assertEquals(120, key.getCurrentTransactionId());
+    assertEquals("third", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(40, key.getTransactionId());
+    assertEquals(50, key.getBucketId());
+    assertEquals(60, key.getRowId());
+    assertEquals(130, key.getCurrentTransactionId());
+    assertEquals("fourth", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(40, key.getTransactionId());
+    assertEquals(50, key.getBucketId());
+    assertEquals(61, key.getRowId());
+    assertEquals(140, key.getCurrentTransactionId());
+    assertEquals("fifth", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(null, pair.nextRecord);
+    Mockito.verify(recordReader).close();
+  }
+
+  private static OrcStruct createOriginalRow(String value) {
+    OrcStruct result = new OrcStruct(1);
+    result.setFieldValue(0, new Text(value));
+    return result;
+  }
+
+  private Reader createMockOriginalReader() throws IOException {
+    Reader reader = Mockito.mock(Reader.class, settings);
+    RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
+    OrcStruct row1 = createOriginalRow("first");
+    OrcStruct row2 = createOriginalRow("second");
+    OrcStruct row3 = createOriginalRow("third");
+    OrcStruct row4 = createOriginalRow("fourth");
+    OrcStruct row5 = createOriginalRow("fifth");
+
+    Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
+        .thenReturn(recordReader);
+    Mockito.when(recordReader.hasNext()).
+        thenReturn(true, true, true, true, true, false);
+    Mockito.when(recordReader.getRowNumber()).thenReturn(0L, 1L, 2L, 3L, 4L);
+    Mockito.when(recordReader.next(null)).thenReturn(row1);
+    Mockito.when(recordReader.next(row1)).thenReturn(row2);
+    Mockito.when(recordReader.next(row2)).thenReturn(row3);
+    Mockito.when(recordReader.next(row3)).thenReturn(row4);
+    Mockito.when(recordReader.next(row4)).thenReturn(row5);
+    return reader;
+  }
+
+  @Test
+  public void testOriginalReaderPair() throws Exception {
+    ReaderKey key = new ReaderKey();
+    Reader reader = createMockOriginalReader();
+    RecordIdentifier minKey = new RecordIdentifier(0, 10, 1);
+    RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3);
+    boolean[] includes = new boolean[]{true, true};
+    ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey,
+        new Reader.Options().include(includes));
+    RecordReader recordReader = pair.recordReader;
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(2, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+    assertEquals("third", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(3, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+    assertEquals("fourth", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(null, pair.nextRecord);
+    Mockito.verify(recordReader).close();
+  }
+
+  private static ValidTxnList createMaximalTxnList() {
+    return new ValidTxnListImpl(Long.MAX_VALUE + ":");
+  }
+
+  @Test
+  public void testOriginalReaderPairNoMin() throws Exception {
+    ReaderKey key = new ReaderKey();
+    Reader reader = createMockOriginalReader();
+    ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null,
+        new Reader.Options());
+    assertEquals("first", value(pair.nextRecord));
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(0, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+
+    pair.next(pair.nextRecord);
+    assertEquals("second", value(pair.nextRecord));
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(1, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+
+    pair.next(pair.nextRecord);
+    assertEquals("third", value(pair.nextRecord));
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(2, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+
+    pair.next(pair.nextRecord);
+    assertEquals("fourth", value(pair.nextRecord));
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(3, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+
+    pair.next(pair.nextRecord);
+    assertEquals("fifth", value(pair.nextRecord));
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(4, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+
+    pair.next(pair.nextRecord);
+    assertEquals(null, pair.nextRecord);
+    Mockito.verify(pair.recordReader).close();
+  }
+
+  @Test
+  public void testNewBase() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("columns", "col1");
+    conf.set("columns.types", "string");
+    Reader reader = Mockito.mock(Reader.class, settings);
+    RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
+
+    List<OrcProto.Type> types = new ArrayList<OrcProto.Type>();
+    OrcProto.Type.Builder typeBuilder = OrcProto.Type.newBuilder();
+    typeBuilder.setKind(OrcProto.Type.Kind.STRUCT).addSubtypes(1)
+        .addSubtypes(2).addSubtypes(3).addSubtypes(4).addSubtypes(5)
+        .addSubtypes(6);
+    types.add(typeBuilder.build());
+    types.add(null);
+    types.add(null);
+    types.add(null);
+    types.add(null);
+    types.add(null);
+    typeBuilder.clearSubtypes();
+    typeBuilder.addSubtypes(7);
+    types.add(typeBuilder.build());
+
+    Mockito.when(reader.getTypes()).thenReturn(types);
+    Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
+        .thenReturn(recordReader);
+
+    OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first");
+    OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second");
+    OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third");
+    OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth");
+    OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth");
+
+    Mockito.when(recordReader.hasNext()).
+        thenReturn(true, true, true, true, true, false);
+
+    Mockito.when(recordReader.getProgress()).thenReturn(1.0f);
+
+    Mockito.when(recordReader.next(null)).thenReturn(row1, row4);
+    Mockito.when(recordReader.next(row1)).thenReturn(row2);
+    Mockito.when(recordReader.next(row2)).thenReturn(row3);
+    Mockito.when(recordReader.next(row3)).thenReturn(row5);
+
+    Mockito.when(reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME))
+        .thenReturn(ByteBuffer.wrap("10,20,30;40,50,60;40,50,61"
+            .getBytes("UTF-8")));
+    Mockito.when(reader.getStripes())
+        .thenReturn(createStripes(2, 2, 1));
+
+    OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader,
+        false, 10, createMaximalTxnList(),
+        new Reader.Options().range(1000, 1000), null);
+    RecordReader rr = merger.getCurrentReader().recordReader;
+    assertEquals(0, merger.getOtherReaders().size());
+
+    assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey());
+    assertEquals(new RecordIdentifier(40, 50, 60), merger.getMaxKey());
+    RecordIdentifier id = merger.createKey();
+    OrcStruct event = merger.createValue();
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(10, id.getTransactionId());
+    assertEquals(20, id.getBucketId());
+    assertEquals(40, id.getRowId());
+    assertEquals("third", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(40, id.getTransactionId());
+    assertEquals(50, id.getBucketId());
+    assertEquals(60, id.getRowId());
+    assertEquals("fourth", getValue(event));
+
+    assertEquals(false, merger.next(id, event));
+    assertEquals(1.0, merger.getProgress(), 0.01);
+    merger.close();
+    Mockito.verify(rr).close();
+    Mockito.verify(rr).getProgress();
+
+    StructObjectInspector eventObjectInspector =
+        (StructObjectInspector) merger.getObjectInspector();
+    List<? extends StructField> fields =
+        eventObjectInspector.getAllStructFieldRefs();
+    assertEquals(OrcRecordUpdater.FIELDS, fields.size());
+    assertEquals("operation",
+        fields.get(OrcRecordUpdater.OPERATION).getFieldName());
+    assertEquals("currentTransaction",
+        fields.get(OrcRecordUpdater.CURRENT_TRANSACTION).getFieldName());
+    assertEquals("originalTransaction",
+        fields.get(OrcRecordUpdater.ORIGINAL_TRANSACTION).getFieldName());
+    assertEquals("bucket",
+        fields.get(OrcRecordUpdater.BUCKET).getFieldName());
+    assertEquals("rowId",
+        fields.get(OrcRecordUpdater.ROW_ID).getFieldName());
+    StructObjectInspector rowObjectInspector =
+        (StructObjectInspector) fields.get(OrcRecordUpdater.ROW)
+            .getFieldObjectInspector();
+    assertEquals("col1",
+        rowObjectInspector.getAllStructFieldRefs().get(0).getFieldName());
+  }
+
+  static class MyRow {
+    Text col1;
+    MyRow(String val) {
+      col1 = new Text(val);
+    }
+  }
+
+  static String getValue(OrcStruct event) {
+    return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
+  }
+
+  @Test
+  public void testEmpty() throws Exception {
+    final int BUCKET = 0;
+    Configuration conf = new Configuration();
+    OrcOutputFormat of = new OrcOutputFormat();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path root = new Path(System.getProperty("test.tmp.dir",
+        "target" + File.separator + "test" + File.separator + "tmp" +
+            File.separator + "testEmpty")).makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    // write the empty base
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+        .inspector(inspector).bucket(BUCKET).writingBase(true)
+        .maximumTransactionId(100);
+    of.getRecordUpdater(root, options).close(false);
+
+    ValidTxnList txnList = new ValidTxnListImpl("200:");
+    AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
+
+    Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
+        BUCKET);
+    Reader baseReader = OrcFile.createReader(basePath,
+        OrcFile.readerOptions(conf));
+    OrcRawRecordMerger merger =
+        new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+            createMaximalTxnList(), new Reader.Options(),
+            AcidUtils.getPaths(directory.getCurrentDirectories()));
+    RecordIdentifier key = merger.createKey();
+    OrcStruct value = merger.createValue();
+    assertEquals(false, merger.next(key, value));
+  }
+
+  /**
+   * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
+   * a base and a delta.
+   * @throws Exception
+   */
+  @Test
+  public void testNewBaseAndDelta() throws Exception {
+    final int BUCKET = 10;
+    String[] values = new String[]{"first", "second", "third", "fourth",
+                                   "fifth", "sixth", "seventh", "eighth",
+                                   "ninth", "tenth"};
+    Configuration conf = new Configuration();
+    OrcOutputFormat of = new OrcOutputFormat();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path root = new Path(System.getProperty("test.tmp.dir",
+        "target" + File.separator + "test" + File.separator + "tmp" +
+            File.separator + "testNewBaseAndDelta")).makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    // write the base
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+        .inspector(inspector).bucket(BUCKET);
+    RecordUpdater ru = of.getRecordUpdater(root,
+        options.writingBase(true).maximumTransactionId(100));
+    for(String v: values) {
+      ru.insert(0, new MyRow(v));
+    }
+    ru.close(false);
+
+    // write a delta
+    ru = of.getRecordUpdater(root, options.writingBase(false)
+        .minimumTransactionId(200).maximumTransactionId(200));
+    ru.update(200, 0, 0, new MyRow("update 1"));
+    ru.update(200, 0, 2, new MyRow("update 2"));
+    ru.update(200, 0, 3, new MyRow("update 3"));
+    ru.delete(200, 0, 7);
+    ru.delete(200, 0, 8);
+    ru.close(false);
+
+    ValidTxnList txnList = new ValidTxnListImpl("200:");
+    AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
+
+    assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
+    assertEquals(new Path(root, "delta_0000200_0000200"),
+        directory.getCurrentDirectories().get(0).getPath());
+
+    Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
+        BUCKET);
+    Reader baseReader = OrcFile.createReader(basePath,
+        OrcFile.readerOptions(conf));
+    OrcRawRecordMerger merger =
+        new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+            createMaximalTxnList(), new Reader.Options(),
+            AcidUtils.getPaths(directory.getCurrentDirectories()));
+    assertEquals(null, merger.getMinKey());
+    assertEquals(null, merger.getMaxKey());
+    RecordIdentifier id = merger.createKey();
+    OrcStruct event = merger.createValue();
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
+    assertEquals("update 1", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
+    assertEquals("second", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 2, 200), id);
+    assertEquals("update 2", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 3, 200), id);
+    assertEquals("update 3", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 4, 0), id);
+    assertEquals("fifth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 5, 0), id);
+    assertEquals("sixth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 6, 0), id);
+    assertEquals("seventh", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
+    assertEquals(null, OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
+    assertEquals(null, OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 9, 0), id);
+    assertEquals("tenth", getValue(event));
+
+    assertEquals(false, merger.next(id, event));
+    merger.close();
+
+    // make a merger that doesn't collapse events
+    merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+            createMaximalTxnList(), new Reader.Options(),
+            AcidUtils.getPaths(directory.getCurrentDirectories()));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
+    assertEquals("update 1", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 0, 0), id);
+    assertEquals("first", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
+    assertEquals("second", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 2, 200), id);
+    assertEquals("update 2", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 2, 0), id);
+    assertEquals("third", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 3, 200), id);
+    assertEquals("update 3", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 3, 0), id);
+    assertEquals("fourth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 4, 0), id);
+    assertEquals("fifth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 5, 0), id);
+    assertEquals("sixth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 6, 0), id);
+    assertEquals("seventh", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
+    assertEquals(null, OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 7, 0), id);
+    assertEquals("eighth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
+    assertEquals(null, OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 8, 0), id);
+    assertEquals("ninth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 9, 0), id);
+    assertEquals("tenth", getValue(event));
+
+    assertEquals(false, merger.next(id, event));
+    merger.close();
+
+    // try ignoring the 200 transaction and make sure it works still
+    ValidTxnList txns = new ValidTxnListImpl("2000:200");
+    merger =
+        new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+            txns, new Reader.Options(),
+            AcidUtils.getPaths(directory.getCurrentDirectories()));
+    for(int i=0; i < values.length; ++i) {
+      assertEquals(true, merger.next(id, event));
+      LOG.info("id = " + id + "event = " + event);
+      assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+          OrcRecordUpdater.getOperation(event));
+      assertEquals(new ReaderKey(0, BUCKET, i, 0), id);
+      assertEquals(values[i], getValue(event));
+    }
+
+    assertEquals(false, merger.next(id, event));
+    merger.close();
+  }
+
+  static class BigRow {
+    int myint;
+    long mylong;
+    Text mytext;
+    float myfloat;
+    double mydouble;
+
+    BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble) {
+      this.myint = myint;
+      this.mylong = mylong;
+      this.mytext = new Text(mytext);
+      this.myfloat = myfloat;
+      this.mydouble = mydouble;
+    }
+  }
+
+  /**
+   * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
+   * a base and a delta.
+   * @throws Exception
+   */
+  @Test
+  public void testRecordReaderOldBaseAndDelta() throws Exception {
+    final int BUCKET = 10;
+    Configuration conf = new Configuration();
+    OrcOutputFormat of = new OrcOutputFormat();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path root = new Path(System.getProperty("test.tmp.dir",
+        "target" + File.separator + "test" + File.separator + "tmp" +
+            File.separator + "testOldBaseAndDelta")).makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    // write the base
+    MemoryManager mgr = new MemoryManager(conf){
+      int rowsAddedSinceCheck = 0;
+
+      synchronized void addedRow() throws IOException {
+        if (++rowsAddedSinceCheck >= 2) {
+          notifyWriters();
+          rowsAddedSinceCheck = 0;
+        }
+      }
+    };
+    // make 5 stripes with 2 rows each
+    Writer writer = OrcFile.createWriter(new Path(root, "0000010_0"),
+        OrcFile.writerOptions(conf).inspector(inspector).fileSystem(fs)
+        .blockPadding(false).bufferSize(10000).compress(CompressionKind.NONE)
+        .stripeSize(1).memory(mgr).version(OrcFile.Version.V_0_11));
+    String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
+       "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
+    for(int i=0; i < values.length; ++i) {
+      writer.addRow(new BigRow(i, i, values[i], i, i));
+    }
+    writer.close();
+
+    // write a delta
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+        .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
+        .bucket(BUCKET).inspector(inspector).filesystem(fs);
+    RecordUpdater ru = of.getRecordUpdater(root, options);
+    values = new String[]{"0.0", null, null, "1.1", null, null, null,
+        "ignore.7"};
+    for(int i=0; i < values.length; ++i) {
+      if (values[i] != null) {
+        ru.update(1, 0, i, new BigRow(i, i, values[i], i, i));
+      }
+    }
+    ru.delete(100, 0, 9);
+    ru.close(false);
+
+    // write a delta
+    options = options.minimumTransactionId(2).maximumTransactionId(2);
+    ru = of.getRecordUpdater(root, options);
+    values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
+    for(int i=0; i < values.length; ++i) {
+      if (values[i] != null) {
+        ru.update(2, 0, i, new BigRow(i, i, values[i], i, i));
+      }
+    }
+    ru.delete(100, 0, 8);
+    ru.close(false);
+
+    InputFormat inf = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.min.split.size", "1");
+    job.set("mapred.max.split.size", "2");
+    job.set("mapred.input.dir", root.toString());
+    InputSplit[] splits = inf.getSplits(job, 5);
+    assertEquals(5, splits.length);
+    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
+
+    // loop through the 5 splits and read each
+    for(int i=0; i < 4; ++i) {
+      System.out.println("starting split " + i);
+      rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
+      NullWritable key = rr.createKey();
+      OrcStruct value = rr.createValue();
+
+      // there should be exactly two rows per a split
+      for(int j=0; j < 2; ++j) {
+        System.out.println("i = " + i + ", j = " + j);
+        assertEquals(true, rr.next(key, value));
+        System.out.println("record = " + value);
+        assertEquals(i + "." + j, value.getFieldValue(2).toString());
+      }
+      assertEquals(false, rr.next(key, value));
+    }
+    rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
+    assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
+  }
+
+  /**
+   * Test the RecordReader when there is a new base and a delta.
+   * @throws Exception
+   */
+  @Test
+  public void testRecordReaderNewBaseAndDelta() throws Exception {
+    final int BUCKET = 10;
+    Configuration conf = new Configuration();
+    OrcOutputFormat of = new OrcOutputFormat();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path root = new Path(System.getProperty("test.tmp.dir",
+        "target" + File.separator + "test" + File.separator + "tmp" +
+            File.separator + "testRecordReaderNewBaseAndDelta"))
+        .makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    // write the base
+    MemoryManager mgr = new MemoryManager(conf){
+      int rowsAddedSinceCheck = 0;
+
+      synchronized void addedRow() throws IOException {
+        if (++rowsAddedSinceCheck >= 2) {
+          notifyWriters();
+          rowsAddedSinceCheck = 0;
+        }
+      }
+    };
+
+    // make 5 stripes with 2 rows each
+    OrcRecordUpdater.OrcOptions options = (OrcRecordUpdater.OrcOptions)
+        new OrcRecordUpdater.OrcOptions(conf)
+        .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
+        .bucket(BUCKET).inspector(inspector).filesystem(fs);
+    options.orcOptions(OrcFile.writerOptions(conf)
+      .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
+      .memory(mgr));
+    RecordUpdater ru = of.getRecordUpdater(root, options);
+    String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
+        "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
+    for(int i=0; i < values.length; ++i) {
+      ru.insert(0, new BigRow(i, i, values[i], i, i));
+    }
+    ru.close(false);
+
+    // write a delta
+    options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1);
+    ru = of.getRecordUpdater(root, options);
+    values = new String[]{"0.0", null, null, "1.1", null, null, null,
+        "ignore.7"};
+    for(int i=0; i < values.length; ++i) {
+      if (values[i] != null) {
+        ru.update(1, 0, i, new BigRow(i, i, values[i], i, i));
+      }
+    }
+    ru.delete(100, 0, 9);
+    ru.close(false);
+
+    // write a delta
+    options.minimumTransactionId(2).maximumTransactionId(2);
+    ru = of.getRecordUpdater(root, options);
+    values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
+    for(int i=0; i < values.length; ++i) {
+      if (values[i] != null) {
+        ru.update(2, 0, i, new BigRow(i, i, values[i], i, i));
+      }
+    }
+    ru.delete(100, 0, 8);
+    ru.close(false);
+
+    InputFormat inf = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.min.split.size", "1");
+    job.set("mapred.max.split.size", "2");
+    job.set("mapred.input.dir", root.toString());
+    InputSplit[] splits = inf.getSplits(job, 5);
+    assertEquals(5, splits.length);
+    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
+
+    // loop through the 5 splits and read each
+    for(int i=0; i < 4; ++i) {
+      System.out.println("starting split " + i);
+      rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
+      NullWritable key = rr.createKey();
+      OrcStruct value = rr.createValue();
+
+      // there should be exactly two rows per a split
+      for(int j=0; j < 2; ++j) {
+        System.out.println("i = " + i + ", j = " + j);
+        assertEquals(true, rr.next(key, value));
+        System.out.println("record = " + value);
+        assertEquals(i + "." + j, value.getFieldValue(2).toString());
+      }
+      assertEquals(false, rr.next(key, value));
+    }
+    rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
+    assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
+  }
+
+  /**
+   * Test the RecordReader when there is a new base and a delta.
+   * @throws Exception
+   */
+  @Test
+  public void testRecordReaderDelta() throws Exception {
+    final int BUCKET = 0;
+    Configuration conf = new Configuration();
+    OrcOutputFormat of = new OrcOutputFormat();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path root = new Path(System.getProperty("test.tmp.dir",
+        "target" + File.separator + "test" + File.separator + "tmp" +
+            File.separator + "testRecordReaderDelta"))
+        .makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    // write a delta
+    AcidOutputFormat.Options options =
+        new AcidOutputFormat.Options(conf)
+            .bucket(BUCKET).inspector(inspector).filesystem(fs)
+            .writingBase(false).minimumTransactionId(1).maximumTransactionId(1);
+    RecordUpdater ru = of.getRecordUpdater(root, options);
+    String[] values = new String[]{"a", "b", "c", "d", "e"};
+    for(int i=0; i < values.length; ++i) {
+      ru.insert(1, new MyRow(values[i]));
+    }
+    ru.close(false);
+
+    // write a delta
+    options.minimumTransactionId(2).maximumTransactionId(2);
+    ru = of.getRecordUpdater(root, options);
+    values = new String[]{"f", "g", "h", "i", "j"};
+    for(int i=0; i < values.length; ++i) {
+      ru.insert(2, new MyRow(values[i]));
+    }
+    ru.close(false);
+
+    InputFormat inf = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.min.split.size", "1");
+    job.set("mapred.max.split.size", "2");
+    job.set("mapred.input.dir", root.toString());
+    job.set("bucket_count", "1");
+    InputSplit[] splits = inf.getSplits(job, 5);
+    assertEquals(1, splits.length);
+    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
+    rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
+    values = new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"};
+    OrcStruct row = rr.createValue();
+    for(int i = 0; i < values.length; ++i) {
+      System.out.println("Checking " + i);
+      assertEquals(true, rr.next(NullWritable.get(), row));
+      assertEquals(values[i], row.getFieldValue(0).toString());
+    }
+    assertEquals(false, rr.next(NullWritable.get(), row));
+  }
+
+  /**
+   * Test the RecordReader when the delta has been flushed, but not closed.
+   * @throws Exception
+   */
+  @Test
+  public void testRecordReaderIncompleteDelta() throws Exception {
+    final int BUCKET = 1;
+    Configuration conf = new Configuration();
+    OrcOutputFormat of = new OrcOutputFormat();
+    FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    Path root = new Path(System.getProperty("test.tmp.dir",
+        "target" + File.separator + "test" + File.separator + "tmp" +
+            File.separator + "testRecordReaderIncompleteDelta"))
+        .makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    // write a base
+    AcidOutputFormat.Options options =
+        new AcidOutputFormat.Options(conf)
+            .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
+            .bucket(BUCKET).inspector(inspector).filesystem(fs);
+    RecordUpdater ru = of.getRecordUpdater(root, options);
+    String[] values= new String[]{"1", "2", "3", "4", "5"};
+    for(int i=0; i < values.length; ++i) {
+      ru.insert(0, new MyRow(values[i]));
+    }
+    ru.close(false);
+
+    // write a delta
+    options.writingBase(false).minimumTransactionId(10)
+        .maximumTransactionId(19);
+    ru = of.getRecordUpdater(root, options);
+    values = new String[]{"6", "7", "8"};
+    for(int i=0; i < values.length; ++i) {
+      ru.insert(1, new MyRow(values[i]));
+    }
+    InputFormat inf = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.input.dir", root.toString());
+    job.set("bucket_count", "2");
+
+    // read the keys before the delta is flushed
+    InputSplit[] splits = inf.getSplits(job, 1);
+    assertEquals(2, splits.length);
+    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
+        inf.getRecordReader(splits[0], job, Reporter.NULL);
+    NullWritable key = rr.createKey();
+    OrcStruct value = rr.createValue();
+    System.out.println("Looking at split " + splits[0]);
+    for(int i=1; i < 6; ++i) {
+      System.out.println("Checking row " + i);
+      assertEquals(true, rr.next(key, value));
+      assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
+    }
+    assertEquals(false, rr.next(key, value));
+
+    ru.flush();
+    ru.flush();
+    values = new String[]{"9", "10"};
+    for(int i=0; i < values.length; ++i) {
+      ru.insert(3, new MyRow(values[i]));
+    }
+    ru.flush();
+
+    splits = inf.getSplits(job, 1);
+    assertEquals(2, splits.length);
+    rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
+    Path sideFile = new Path(root +
+        "/delta_0000010_0000019/bucket_00001_flush_length");
+    assertEquals(true, fs.exists(sideFile));
+    assertEquals(24, fs.getFileStatus(sideFile).getLen());
+
+    for(int i=1; i < 11; ++i) {
+      assertEquals(true, rr.next(key, value));
+      assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
+    }
+    assertEquals(false, rr.next(key, value));
+  }
+
+}

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java?rev=1581977&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java Wed Mar 26 18:14:37 2014
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestOrcRecordUpdater {
+
+  @Test
+  public void testAccessors() throws Exception {
+    OrcStruct event = new OrcStruct(OrcRecordUpdater.FIELDS);
+    event.setFieldValue(OrcRecordUpdater.OPERATION,
+        new IntWritable(OrcRecordUpdater.INSERT_OPERATION));
+    event.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
+        new LongWritable(100));
+    event.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
+        new LongWritable(50));
+    event.setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(200));
+    event.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(300));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(50, OrcRecordUpdater.getOriginalTransaction(event));
+    assertEquals(100, OrcRecordUpdater.getCurrentTransaction(event));
+    assertEquals(200, OrcRecordUpdater.getBucket(event));
+    assertEquals(300, OrcRecordUpdater.getRowId(event));
+  }
+
+  Path workDir = new Path(System.getProperty("test.tmp.dir",
+      "target" + File.separator + "test" + File.separator + "tmp"));
+
+  static class MyRow {
+    Text field;
+    MyRow(String val) {
+      field = new Text(val);
+    }
+  }
+
+  @Test
+  public void testWriter() throws Exception {
+    Path root = new Path(workDir, "testWriter");
+    Configuration conf = new Configuration();
+    // Must use raw local because the checksummer doesn't honor flushes.
+    FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+        .filesystem(fs)
+        .bucket(10)
+        .writingBase(false)
+        .minimumTransactionId(10)
+        .maximumTransactionId(19)
+        .inspector(inspector)
+        .reporter(Reporter.NULL);
+    RecordUpdater updater = new OrcRecordUpdater(root, options);
+    updater.insert(11, new MyRow("first"));
+    updater.insert(11, new MyRow("second"));
+    updater.insert(11, new MyRow("third"));
+    updater.flush();
+    updater.insert(12, new MyRow("fourth"));
+    updater.insert(12, new MyRow("fifth"));
+    updater.flush();
+    Path bucketPath = AcidUtils.createFilename(root, options);
+    Path sidePath = OrcRecordUpdater.getSideFile(bucketPath);
+    DataInputStream side = fs.open(sidePath);
+
+    // read the stopping point for the first flush and make sure we only see
+    // 3 rows
+    long len = side.readLong();
+    Reader reader = OrcFile.createReader(bucketPath,
+        new OrcFile.ReaderOptions(conf).filesystem(fs).maxLength(len));
+    assertEquals(3, reader.getNumberOfRows());
+
+    // read the second flush and make sure we see all 5 rows
+    len = side.readLong();
+    side.close();
+    reader = OrcFile.createReader(bucketPath,
+        new OrcFile.ReaderOptions(conf).filesystem(fs).maxLength(len));
+    assertEquals(5, reader.getNumberOfRows());
+    RecordReader rows = reader.rows();
+
+    // check the contents of the file
+    assertEquals(true, rows.hasNext());
+    OrcStruct row = (OrcStruct) rows.next(null);
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(row));
+    assertEquals(11, OrcRecordUpdater.getCurrentTransaction(row));
+    assertEquals(11, OrcRecordUpdater.getOriginalTransaction(row));
+    assertEquals(10, OrcRecordUpdater.getBucket(row));
+    assertEquals(0, OrcRecordUpdater.getRowId(row));
+    assertEquals("first",
+        OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+    assertEquals(true, rows.hasNext());
+    row = (OrcStruct) rows.next(null);
+    assertEquals(1, OrcRecordUpdater.getRowId(row));
+    assertEquals(10, OrcRecordUpdater.getBucket(row));
+    assertEquals("second",
+        OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+    assertEquals(true, rows.hasNext());
+    row = (OrcStruct) rows.next(null);
+    assertEquals(2, OrcRecordUpdater.getRowId(row));
+    assertEquals(10, OrcRecordUpdater.getBucket(row));
+    assertEquals("third",
+        OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+    assertEquals(true, rows.hasNext());
+    row = (OrcStruct) rows.next(null);
+    assertEquals(12, OrcRecordUpdater.getCurrentTransaction(row));
+    assertEquals(12, OrcRecordUpdater.getOriginalTransaction(row));
+    assertEquals(10, OrcRecordUpdater.getBucket(row));
+    assertEquals(0, OrcRecordUpdater.getRowId(row));
+    assertEquals("fourth",
+        OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+    assertEquals(true, rows.hasNext());
+    row = (OrcStruct) rows.next(null);
+    assertEquals(1, OrcRecordUpdater.getRowId(row));
+    assertEquals("fifth",
+        OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+    assertEquals(false, rows.hasNext());
+
+    // add one more record and close
+    updater.insert(20, new MyRow("sixth"));
+    updater.close(false);
+    reader = OrcFile.createReader(bucketPath,
+        new OrcFile.ReaderOptions(conf).filesystem(fs));
+    assertEquals(6, reader.getNumberOfRows());
+    assertEquals(false, fs.exists(sidePath));
+  }
+
+  @Test
+  public void testUpdates() throws Exception {
+    Path root = new Path(workDir, "testUpdates");
+    Configuration conf = new Configuration();
+    FileSystem fs = root.getFileSystem(conf);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+        .filesystem(fs)
+        .bucket(20)
+        .writingBase(false)
+        .minimumTransactionId(100)
+        .maximumTransactionId(100)
+        .inspector(inspector)
+        .reporter(Reporter.NULL);
+    RecordUpdater updater = new OrcRecordUpdater(root, options);
+    updater.update(100, 10, 30, new MyRow("update"));
+    updater.delete(100, 40, 60);
+    updater.close(false);
+    Path bucketPath = AcidUtils.createFilename(root, options);
+
+    Reader reader = OrcFile.createReader(bucketPath,
+        new OrcFile.ReaderOptions(conf).filesystem(fs));
+    assertEquals(2, reader.getNumberOfRows());
+
+    RecordReader rows = reader.rows();
+
+    // check the contents of the file
+    assertEquals(true, rows.hasNext());
+    OrcStruct row = (OrcStruct) rows.next(null);
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(row));
+    assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
+    assertEquals(10, OrcRecordUpdater.getOriginalTransaction(row));
+    assertEquals(20, OrcRecordUpdater.getBucket(row));
+    assertEquals(30, OrcRecordUpdater.getRowId(row));
+    assertEquals("update",
+        OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+    assertEquals(true, rows.hasNext());
+    row = (OrcStruct) rows.next(null);
+    assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
+    assertEquals(40, OrcRecordUpdater.getOriginalTransaction(row));
+    assertEquals(20, OrcRecordUpdater.getBucket(row));
+    assertEquals(60, OrcRecordUpdater.getRowId(row));
+    assertEquals(null, OrcRecordUpdater.getRow(row));
+    assertEquals(false, rows.hasNext());
+  }
+}

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSerDeStats.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSerDeStats.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSerDeStats.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSerDeStats.java Wed Mar 26 18:14:37 2014
@@ -25,6 +25,7 @@ import static junit.framework.Assert.ass
 import java.io.File;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -96,9 +97,7 @@ public class TestOrcSerDeStats {
 
     MiddleStruct(InnerStruct... items) {
       list.clear();
-      for (InnerStruct item : items) {
-        list.add(item);
-      }
+      list.addAll(Arrays.asList(items));
     }
   }
 
@@ -158,9 +157,7 @@ public class TestOrcSerDeStats {
 
   private static List<InnerStruct> list(InnerStruct... items) {
     List<InnerStruct> result = new ArrayList<InnerStruct>();
-    for (InnerStruct s : items) {
-      result.add(s);
-    }
+    result.addAll(Arrays.asList(items));
     return result;
   }
 
@@ -212,7 +209,8 @@ public class TestOrcSerDeStats {
     writer.close();
     assertEquals(4, writer.getNumberOfRows());
     assertEquals(273, writer.getRawDataSize());
-    Reader reader = OrcFile.createReader(fs, testFilePath, conf);
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
     assertEquals(4, reader.getNumberOfRows());
     assertEquals(273, reader.getRawDataSize());
     assertEquals(15, reader.getRawDataSizeOfColumns(Lists.newArrayList("bytes1")));
@@ -248,7 +246,7 @@ public class TestOrcSerDeStats {
         getStructFieldRef("bytes1").getFieldObjectInspector();
     StringObjectInspector st = (StringObjectInspector) readerInspector.
         getStructFieldRef("string1").getFieldObjectInspector();
-    RecordReader rows = reader.rows(null);
+    RecordReader rows = reader.rows();
     Object row = rows.next(null);
     assertNotNull(row);
     // check the contents of the first row
@@ -310,7 +308,8 @@ public class TestOrcSerDeStats {
     assertEquals(5000, writer.getNumberOfRows());
     assertEquals(430000000, writer.getRawDataSize());
 
-    Reader reader = OrcFile.createReader(fs, testFilePath, conf);
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
     // stats from reader
     assertEquals(5000, reader.getNumberOfRows());
     assertEquals(430000000, reader.getRawDataSize());
@@ -341,7 +340,8 @@ public class TestOrcSerDeStats {
     assertEquals(1000, writer.getNumberOfRows());
     assertEquals(950000, writer.getRawDataSize());
 
-    Reader reader = OrcFile.createReader(fs, testFilePath, conf);
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
     // stats from reader
     assertEquals(1000, reader.getNumberOfRows());
     assertEquals(950000, reader.getRawDataSize());
@@ -372,7 +372,8 @@ public class TestOrcSerDeStats {
     assertEquals(1000, writer.getNumberOfRows());
     assertEquals(44500, writer.getRawDataSize());
 
-    Reader reader = OrcFile.createReader(fs, testFilePath, conf);
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
     // stats from reader
     assertEquals(1000, reader.getNumberOfRows());
     assertEquals(44500, reader.getRawDataSize());
@@ -413,7 +414,8 @@ public class TestOrcSerDeStats {
     long rawDataSize = writer.getRawDataSize();
     assertEquals(2, rowCount);
     assertEquals(1740, rawDataSize);
-    Reader reader = OrcFile.createReader(fs, testFilePath, conf);
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
 
     assertEquals(2, reader.getNumberOfRows());
     assertEquals(1740, reader.getRawDataSize());
@@ -506,7 +508,8 @@ public class TestOrcSerDeStats {
     long rawDataSize = writer.getRawDataSize();
     assertEquals(2, rowCount);
     assertEquals(1740, rawDataSize);
-    Reader reader = OrcFile.createReader(fs, testFilePath, conf);
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
 
     assertEquals(2, reader.getNumberOfRows());
     assertEquals(1740, reader.getRawDataSize());
@@ -573,7 +576,8 @@ public class TestOrcSerDeStats {
   @Test(expected = ClassCastException.class)
   public void testSerdeStatsOldFormat() throws Exception {
     Path oldFilePath = new Path(HiveTestUtils.getFileFromClasspath("orc-file-11-format.orc"));
-    Reader reader = OrcFile.createReader(fs, oldFilePath, conf);
+    Reader reader = OrcFile.createReader(oldFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
 
     int stripeCount = 0;
     int rowCount = 0;

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java Wed Mar 26 18:14:37 2014
@@ -88,6 +88,8 @@ public class TestOrcSplitElimination {
     conf.set("columns", "userid,string1,subtype,decimal1,ts");
     conf.set("columns.types", "bigint,string,double,decimal,timestamp");
     // needed columns
+    conf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false");
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,2");
     conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "userid,subtype");
     fs = FileSystem.getLocal(conf);
     testFilePath = new Path(workDir, "TestOrcFile." +

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java Wed Mar 26 18:14:37 2014
@@ -22,7 +22,6 @@ import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.Decimal128;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
@@ -64,6 +63,7 @@ public class TestVectorizedORCReader {
     fs.delete(testFilePath, false);
   }
 
+  @SuppressWarnings("unused")
   static class MyRecord {
     private final Boolean bo;
     private final Byte by;
@@ -131,10 +131,12 @@ public class TestVectorizedORCReader {
 
   private void checkVectorizedReader() throws Exception {
 
-    Reader vreader = OrcFile.createReader(testFilePath.getFileSystem(conf), testFilePath, conf);
-    Reader reader = OrcFile.createReader(testFilePath.getFileSystem(conf), testFilePath, conf);
-    RecordReaderImpl vrr = (RecordReaderImpl) vreader.rows(null);
-    RecordReaderImpl rr = (RecordReaderImpl) reader.rows(null);
+    Reader vreader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf));
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf));
+    RecordReaderImpl vrr = (RecordReaderImpl) vreader.rows();
+    RecordReaderImpl rr = (RecordReaderImpl) reader.rows();
     VectorizedRowBatch batch = null;
     OrcStruct row = null;
 
@@ -142,7 +144,7 @@ public class TestVectorizedORCReader {
     while (vrr.hasNext()) {
       batch = vrr.nextBatch(batch);
       for (int i = 0; i < batch.size; i++) {
-        row = (OrcStruct) rr.next((Object) row);
+        row = (OrcStruct) rr.next(row);
         for (int j = 0; j < batch.cols.length; j++) {
           Object a = (row.getFieldValue(j));
           Object b = batch.cols[j].getWritableObject(i);

Modified: hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Wed Mar 26 18:14:37 2014
@@ -28,6 +28,7 @@ import java.net.URL;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -43,6 +44,7 @@ import javax.security.auth.login.LoginEx
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -617,29 +619,11 @@ public class Hadoop20Shims implements Ha
   }
 
   @Override
-  public Iterator<FileStatus> listLocatedStatus(final FileSystem fs,
+  public List<FileStatus> listLocatedStatus(final FileSystem fs,
                                                 final Path path,
                                                 final PathFilter filter
                                                ) throws IOException {
-    return new Iterator<FileStatus>() {
-      private final FileStatus[] result = fs.listStatus(path, filter);
-      private int current = 0;
-
-      @Override
-      public boolean hasNext() {
-        return current < result.length;
-      }
-
-      @Override
-      public FileStatus next() {
-        return result[current++];
-      }
-
-      @Override
-      public void remove() {
-        throw new IllegalArgumentException("Not supported");
-      }
-    };
+    return Arrays.asList(fs.listStatus(path, filter));
   }
 
   @Override
@@ -649,6 +633,11 @@ public class Hadoop20Shims implements Ha
   }
 
   @Override
+  public void hflush(FSDataOutputStream stream) throws IOException {
+    stream.sync();
+  }
+
+  @Override
   public void authorizeProxyAccess(String proxyUser, UserGroupInformation realUserUgi,
       String ipAddress, Configuration conf) throws IOException {
     // This hadoop version doesn't have proxy verification

Modified: hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Wed Mar 26 18:14:37 2014
@@ -21,16 +21,19 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.net.URI;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -367,29 +370,11 @@ public class Hadoop20SShims extends Hado
   }
 
   @Override
-  public Iterator<FileStatus> listLocatedStatus(final FileSystem fs,
-                                                final Path path,
-                                                final PathFilter filter
-  ) throws IOException {
-    return new Iterator<FileStatus>() {
-      private final FileStatus[] result = fs.listStatus(path, filter);
-      private int current = 0;
-
-      @Override
-      public boolean hasNext() {
-        return current < result.length;
-      }
-
-      @Override
-      public FileStatus next() {
-        return result[current++];
-      }
-
-      @Override
-      public void remove() {
-        throw new IllegalArgumentException("Not supported");
-      }
-    };
+  public List<FileStatus> listLocatedStatus(final FileSystem fs,
+                                            final Path path,
+                                            final PathFilter filter
+                                            ) throws IOException {
+    return Arrays.asList(fs.listStatus(path, filter));
   }
 
   @Override
@@ -399,6 +384,11 @@ public class Hadoop20SShims extends Hado
   }
 
   @Override
+  public void hflush(FSDataOutputStream stream) throws IOException {
+    stream.sync();
+  }
+
+  @Override
   public FileSystem createProxyFileSystem(FileSystem fs, URI uri) {
     return new ProxyFileSystem(fs, uri);
   }

Modified: hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Wed Mar 26 18:14:37 2014
@@ -22,8 +22,10 @@ import java.lang.Integer;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 import java.net.URI;
@@ -34,6 +36,7 @@ import org.apache.commons.lang.StringUti
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -465,51 +468,19 @@ public class Hadoop23Shims extends Hadoo
   }
 
   @Override
-  public Iterator<FileStatus> listLocatedStatus(final FileSystem fs,
-                                                final Path path,
-                                                final PathFilter filter
-  ) throws IOException {
-    return new Iterator<FileStatus>() {
-      private final RemoteIterator<LocatedFileStatus> inner =
-          fs.listLocatedStatus(path);
-      private FileStatus next;
-      {
-        next = null;
-        while (inner.hasNext() && next == null) {
-          next = inner.next();
-          if (filter != null && !filter.accept(next.getPath())) {
-            next = null;
-          }
-        }
-      }
-
-      @Override
-      public boolean hasNext() {
-        return next != null;
-      }
-
-      @Override
-      public FileStatus next() {
-        FileStatus result = next;
-        next = null;
-        try {
-          while (inner.hasNext() && next == null) {
-            next = inner.next();
-            if (filter != null && !filter.accept(next.getPath())) {
-              next = null;
-            }
-          }
-        } catch (IOException ioe) {
-          throw new IllegalArgumentException("Iterator exception", ioe);
-        }
-        return result;
+  public List<FileStatus> listLocatedStatus(final FileSystem fs,
+                                            final Path path,
+                                            final PathFilter filter
+                                           ) throws IOException {
+    RemoteIterator<LocatedFileStatus> itr = fs.listLocatedStatus(path);
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    while(itr.hasNext()) {
+      FileStatus stat = itr.next();
+      if (filter == null || filter.accept(stat.getPath())) {
+        result.add(stat);
       }
-
-      @Override
-      public void remove() {
-        throw new IllegalArgumentException("Not supported");
-      }
-    };
+    }
+    return result;
   }
 
   @Override
@@ -522,6 +493,11 @@ public class Hadoop23Shims extends Hadoo
     }
   }
 
+  @Override
+  public void hflush(FSDataOutputStream stream) throws IOException {
+    stream.hflush();
+  }
+
   class ProxyFileSystem23 extends ProxyFileSystem {
     public ProxyFileSystem23(FileSystem fs) {
       super(fs);

Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Wed Mar 26 18:14:37 2014
@@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -448,11 +449,11 @@ public interface HadoopShims {
    * @param fs the file system
    * @param path the directory name to get the status and block locations
    * @param filter a filter that needs to accept the file (or null)
-   * @return an iterator for the located file status objects
+   * @return an list for the located file status objects
    * @throws IOException
    */
-  Iterator<FileStatus> listLocatedStatus(FileSystem fs, Path path,
-      PathFilter filter) throws IOException;
+  List<FileStatus> listLocatedStatus(FileSystem fs, Path path,
+                                     PathFilter filter) throws IOException;
 
   /**
    * For file status returned by listLocatedStatus, convert them into a list
@@ -465,6 +466,13 @@ public interface HadoopShims {
   BlockLocation[] getLocations(FileSystem fs,
       FileStatus status) throws IOException;
 
+  /**
+   * Flush and make visible to other users the changes to the given stream.
+   * @param stream the stream to hflush.
+   * @throws IOException
+   */
+  public void hflush(FSDataOutputStream stream) throws IOException;
+
   public HCatHadoopShims getHCatShim();
   public interface HCatHadoopShims {