You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/03/30 21:20:59 UTC

[1/2] hive git commit: HIVE-10249 ACID: show locks should show who the lock is waiting for (Eugene Koifman, reviewed by Wei Zheng)

Repository: hive
Updated Branches:
  refs/heads/branch-1 a27595115 -> b6f6c4acb


http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig
deleted file mode 100644
index 15ee24c..0000000
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig
+++ /dev/null
@@ -1,1150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.io.RecordUpdater;
-import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair;
-import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey;
-import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.junit.Test;
-import org.mockito.MockSettings;
-import org.mockito.Mockito;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-
-public class TestOrcRawRecordMerger {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TestOrcRawRecordMerger.class);
-//todo: why is statementId -1?
-  @Test
-  public void testOrdering() throws Exception {
-    ReaderKey left = new ReaderKey(100, 200, 1200, 300);
-    ReaderKey right = new ReaderKey();
-    right.setValues(100, 200, 1000, 200,1);
-    assertTrue(right.compareTo(left) < 0);
-    assertTrue(left.compareTo(right) > 0);
-    assertEquals(false, left.equals(right));
-    left.set(right);
-    assertTrue(right.compareTo(left) == 0);
-    assertEquals(true, right.equals(left));
-    right.setRowId(2000);
-    assertTrue(right.compareTo(left) > 0);
-    left.setValues(1, 2, 3, 4,-1);
-    right.setValues(100, 2, 3, 4,-1);
-    assertTrue(left.compareTo(right) < 0);
-    assertTrue(right.compareTo(left) > 0);
-    left.setValues(1, 2, 3, 4,-1);
-    right.setValues(1, 100, 3, 4,-1);
-    assertTrue(left.compareTo(right) < 0);
-    assertTrue(right.compareTo(left) > 0);
-    left.setValues(1, 2, 3, 100,-1);
-    right.setValues(1, 2, 3, 4,-1);
-    assertTrue(left.compareTo(right) < 0);
-    assertTrue(right.compareTo(left) > 0);
-
-    // ensure that we are consistent when comparing to the base class
-    RecordIdentifier ri = new RecordIdentifier(1, 2, 3);
-    assertEquals(1, ri.compareTo(left));
-    assertEquals(-1, left.compareTo(ri));
-    assertEquals(false, ri.equals(left));
-    assertEquals(false, left.equals(ri));
-  }
-
-  private static void setRow(OrcStruct event,
-                             int operation,
-                             long originalTransaction,
-                             int bucket,
-                             long rowId,
-                             long currentTransaction,
-                             String value) {
-    event.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation));
-    event.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
-        new LongWritable(originalTransaction));
-    event.setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucket));
-    event.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(rowId));
-    event.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
-        new LongWritable(currentTransaction));
-    OrcStruct row = new OrcStruct(1);
-    row.setFieldValue(0, new Text(value));
-    event.setFieldValue(OrcRecordUpdater.ROW, row);
-  }
-
-  private static String value(OrcStruct event) {
-    return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
-  }
-
-  private List<StripeInformation> createStripes(long... rowCounts) {
-    long offset = 0;
-    List<StripeInformation> result =
-        new ArrayList<StripeInformation>(rowCounts.length);
-    for(long count: rowCounts) {
-      OrcProto.StripeInformation.Builder stripe =
-          OrcProto.StripeInformation.newBuilder();
-      stripe.setDataLength(800).setIndexLength(100).setFooterLength(100)
-          .setNumberOfRows(count).setOffset(offset);
-      offset += 1000;
-      result.add(new ReaderImpl.StripeInformationImpl(stripe.build()));
-    }
-    return result;
-  }
-
-  // can add .verboseLogging() to cause Mockito to log invocations
-  private final MockSettings settings = Mockito.withSettings();
-  private final Path tmpDir = new Path(System.getProperty("test.tmp.dir",
-      "target" + File.separator + "test" + File.separator + "tmp"));
-
-  private Reader createMockReader() throws IOException {
-    Reader reader = Mockito.mock(Reader.class, settings);
-    RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
-    OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS);
-    setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first");
-    OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS);
-    setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second");
-    OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS);
-    setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third");
-    OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS);
-    setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth");
-    OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS);
-    setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth");
-    Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
-        .thenReturn(recordReader);
-
-    Mockito.when(recordReader.hasNext()).
-        thenReturn(true, true, true, true, true, false);
-
-    Mockito.when(recordReader.getProgress()).thenReturn(1.0f);
-
-    Mockito.when(recordReader.next(null)).thenReturn(row1);
-    Mockito.when(recordReader.next(row1)).thenReturn(row2);
-    Mockito.when(recordReader.next(row2)).thenReturn(row3);
-    Mockito.when(recordReader.next(row3)).thenReturn(row4);
-    Mockito.when(recordReader.next(row4)).thenReturn(row5);
-
-    return reader;
-  }
-
-  @Test
-  public void testReaderPair() throws Exception {
-    ReaderKey key = new ReaderKey();
-    Reader reader = createMockReader();
-    RecordIdentifier minKey = new RecordIdentifier(10, 20, 30);
-    RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
-    ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
-        new Reader.Options(), 0);
-    RecordReader recordReader = pair.recordReader;
-    assertEquals(10, key.getTransactionId());
-    assertEquals(20, key.getBucketId());
-    assertEquals(40, key.getRowId());
-    assertEquals(120, key.getCurrentTransactionId());
-    assertEquals("third", value(pair.nextRecord));
-
-    pair.next(pair.nextRecord);
-    assertEquals(40, key.getTransactionId());
-    assertEquals(50, key.getBucketId());
-    assertEquals(60, key.getRowId());
-    assertEquals(130, key.getCurrentTransactionId());
-    assertEquals("fourth", value(pair.nextRecord));
-
-    pair.next(pair.nextRecord);
-    assertEquals(null, pair.nextRecord);
-    Mockito.verify(recordReader).close();
-  }
-
-  @Test
-  public void testReaderPairNoMin() throws Exception {
-    ReaderKey key = new ReaderKey();
-    Reader reader = createMockReader();
-
-    ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
-        new Reader.Options(), 0);
-    RecordReader recordReader = pair.recordReader;
-    assertEquals(10, key.getTransactionId());
-    assertEquals(20, key.getBucketId());
-    assertEquals(20, key.getRowId());
-    assertEquals(100, key.getCurrentTransactionId());
-    assertEquals("first", value(pair.nextRecord));
-
-    pair.next(pair.nextRecord);
-    assertEquals(10, key.getTransactionId());
-    assertEquals(20, key.getBucketId());
-    assertEquals(30, key.getRowId());
-    assertEquals(110, key.getCurrentTransactionId());
-    assertEquals("second", value(pair.nextRecord));
-
-    pair.next(pair.nextRecord);
-    assertEquals(10, key.getTransactionId());
-    assertEquals(20, key.getBucketId());
-    assertEquals(40, key.getRowId());
-    assertEquals(120, key.getCurrentTransactionId());
-    assertEquals("third", value(pair.nextRecord));
-
-    pair.next(pair.nextRecord);
-    assertEquals(40, key.getTransactionId());
-    assertEquals(50, key.getBucketId());
-    assertEquals(60, key.getRowId());
-    assertEquals(130, key.getCurrentTransactionId());
-    assertEquals("fourth", value(pair.nextRecord));
-
-    pair.next(pair.nextRecord);
-    assertEquals(40, key.getTransactionId());
-    assertEquals(50, key.getBucketId());
-    assertEquals(61, key.getRowId());
-    assertEquals(140, key.getCurrentTransactionId());
-    assertEquals("fifth", value(pair.nextRecord));
-
-    pair.next(pair.nextRecord);
-    assertEquals(null, pair.nextRecord);
-    Mockito.verify(recordReader).close();
-  }
-
-  private static OrcStruct createOriginalRow(String value) {
-    OrcStruct result = new OrcStruct(1);
-    result.setFieldValue(0, new Text(value));
-    return result;
-  }
-
-  private Reader createMockOriginalReader() throws IOException {
-    Reader reader = Mockito.mock(Reader.class, settings);
-    RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
-    OrcStruct row1 = createOriginalRow("first");
-    OrcStruct row2 = createOriginalRow("second");
-    OrcStruct row3 = createOriginalRow("third");
-    OrcStruct row4 = createOriginalRow("fourth");
-    OrcStruct row5 = createOriginalRow("fifth");
-
-    Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
-        .thenReturn(recordReader);
-    Mockito.when(recordReader.hasNext()).
-        thenReturn(true, true, true, true, true, false);
-    Mockito.when(recordReader.getRowNumber()).thenReturn(0L, 1L, 2L, 3L, 4L);
-    Mockito.when(recordReader.next(null)).thenReturn(row1);
-    Mockito.when(recordReader.next(row1)).thenReturn(row2);
-    Mockito.when(recordReader.next(row2)).thenReturn(row3);
-    Mockito.when(recordReader.next(row3)).thenReturn(row4);
-    Mockito.when(recordReader.next(row4)).thenReturn(row5);
-    return reader;
-  }
-
-  @Test
-  public void testOriginalReaderPair() throws Exception {
-    ReaderKey key = new ReaderKey();
-    Reader reader = createMockOriginalReader();
-    RecordIdentifier minKey = new RecordIdentifier(0, 10, 1);
-    RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3);
-    boolean[] includes = new boolean[]{true, true};
-    ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey,
-        new Reader.Options().include(includes));
-    RecordReader recordReader = pair.recordReader;
-    assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
-    assertEquals(2, key.getRowId());
-    assertEquals(0, key.getCurrentTransactionId());
-    assertEquals("third", value(pair.nextRecord));
-
-    pair.next(pair.nextRecord);
-    assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
-    assertEquals(3, key.getRowId());
-    assertEquals(0, key.getCurrentTransactionId());
-    assertEquals("fourth", value(pair.nextRecord));
-
-    pair.next(pair.nextRecord);
-    assertEquals(null, pair.nextRecord);
-    Mockito.verify(recordReader).close();
-  }
-
-  private static ValidTxnList createMaximalTxnList() {
-    return new ValidReadTxnList(Long.MAX_VALUE + ":");
-  }
-
-  @Test
-  public void testOriginalReaderPairNoMin() throws Exception {
-    ReaderKey key = new ReaderKey();
-    Reader reader = createMockOriginalReader();
-    ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null,
-        new Reader.Options());
-    assertEquals("first", value(pair.nextRecord));
-    assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
-    assertEquals(0, key.getRowId());
-    assertEquals(0, key.getCurrentTransactionId());
-
-    pair.next(pair.nextRecord);
-    assertEquals("second", value(pair.nextRecord));
-    assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
-    assertEquals(1, key.getRowId());
-    assertEquals(0, key.getCurrentTransactionId());
-
-    pair.next(pair.nextRecord);
-    assertEquals("third", value(pair.nextRecord));
-    assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
-    assertEquals(2, key.getRowId());
-    assertEquals(0, key.getCurrentTransactionId());
-
-    pair.next(pair.nextRecord);
-    assertEquals("fourth", value(pair.nextRecord));
-    assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
-    assertEquals(3, key.getRowId());
-    assertEquals(0, key.getCurrentTransactionId());
-
-    pair.next(pair.nextRecord);
-    assertEquals("fifth", value(pair.nextRecord));
-    assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
-    assertEquals(4, key.getRowId());
-    assertEquals(0, key.getCurrentTransactionId());
-
-    pair.next(pair.nextRecord);
-    assertEquals(null, pair.nextRecord);
-    Mockito.verify(pair.recordReader).close();
-  }
-
-  @Test
-  public void testNewBase() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set("columns", "col1");
-    conf.set("columns.types", "string");
-    Reader reader = Mockito.mock(Reader.class, settings);
-    RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
-
-    List<OrcProto.Type> types = new ArrayList<OrcProto.Type>();
-    OrcProto.Type.Builder typeBuilder = OrcProto.Type.newBuilder();
-    typeBuilder.setKind(OrcProto.Type.Kind.STRUCT).addSubtypes(1)
-        .addSubtypes(2).addSubtypes(3).addSubtypes(4).addSubtypes(5)
-        .addSubtypes(6);
-    types.add(typeBuilder.build());
-    types.add(null);
-    types.add(null);
-    types.add(null);
-    types.add(null);
-    types.add(null);
-    typeBuilder.clearSubtypes();
-    typeBuilder.addSubtypes(7);
-    types.add(typeBuilder.build());
-
-    Mockito.when(reader.getTypes()).thenReturn(types);
-    Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
-        .thenReturn(recordReader);
-
-    OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS);
-    setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first");
-    OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS);
-    setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second");
-    OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS);
-    setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third");
-    OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS);
-    setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth");
-    OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS);
-    setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth");
-
-    Mockito.when(recordReader.hasNext()).
-        thenReturn(true, true, true, true, true, false);
-
-    Mockito.when(recordReader.getProgress()).thenReturn(1.0f);
-
-    Mockito.when(recordReader.next(null)).thenReturn(row1, row4);
-    Mockito.when(recordReader.next(row1)).thenReturn(row2);
-    Mockito.when(recordReader.next(row2)).thenReturn(row3);
-    Mockito.when(recordReader.next(row3)).thenReturn(row5);
-
-    Mockito.when(reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME))
-        .thenReturn(ByteBuffer.wrap("10,20,30;40,50,60;40,50,61"
-            .getBytes("UTF-8")));
-    Mockito.when(reader.getStripes())
-        .thenReturn(createStripes(2, 2, 1));
-
-    OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader,
-        false, 10, createMaximalTxnList(),
-        new Reader.Options().range(1000, 1000), null);
-    RecordReader rr = merger.getCurrentReader().recordReader;
-    assertEquals(0, merger.getOtherReaders().size());
-
-    assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey());
-    assertEquals(new RecordIdentifier(40, 50, 60), merger.getMaxKey());
-    RecordIdentifier id = merger.createKey();
-    OrcStruct event = merger.createValue();
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(10, id.getTransactionId());
-    assertEquals(20, id.getBucketId());
-    assertEquals(40, id.getRowId());
-    assertEquals("third", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(40, id.getTransactionId());
-    assertEquals(50, id.getBucketId());
-    assertEquals(60, id.getRowId());
-    assertEquals("fourth", getValue(event));
-
-    assertEquals(false, merger.next(id, event));
-    assertEquals(1.0, merger.getProgress(), 0.01);
-    merger.close();
-    Mockito.verify(rr).close();
-    Mockito.verify(rr).getProgress();
-
-    StructObjectInspector eventObjectInspector =
-        (StructObjectInspector) merger.getObjectInspector();
-    List<? extends StructField> fields =
-        eventObjectInspector.getAllStructFieldRefs();
-    assertEquals(OrcRecordUpdater.FIELDS, fields.size());
-    assertEquals("operation",
-        fields.get(OrcRecordUpdater.OPERATION).getFieldName());
-    assertEquals("currentTransaction",
-        fields.get(OrcRecordUpdater.CURRENT_TRANSACTION).getFieldName());
-    assertEquals("originalTransaction",
-        fields.get(OrcRecordUpdater.ORIGINAL_TRANSACTION).getFieldName());
-    assertEquals("bucket",
-        fields.get(OrcRecordUpdater.BUCKET).getFieldName());
-    assertEquals("rowId",
-        fields.get(OrcRecordUpdater.ROW_ID).getFieldName());
-    StructObjectInspector rowObjectInspector =
-        (StructObjectInspector) fields.get(OrcRecordUpdater.ROW)
-            .getFieldObjectInspector();
-    assertEquals("col1",
-        rowObjectInspector.getAllStructFieldRefs().get(0).getFieldName());
-  }
-
-  static class MyRow {
-    Text col1;
-    RecordIdentifier ROW__ID;
-
-    MyRow(String val) {
-      col1 = new Text(val);
-    }
-
-    MyRow(String val, long rowId, long origTxn, int bucket) {
-      col1 = new Text(val);
-      ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
-    }
-  }
-
-  static String getValue(OrcStruct event) {
-    return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
-  }
-
-  @Test
-  public void testEmpty() throws Exception {
-    final int BUCKET = 0;
-    Configuration conf = new Configuration();
-    OrcOutputFormat of = new OrcOutputFormat();
-    FileSystem fs = FileSystem.getLocal(conf);
-    Path root = new Path(tmpDir, "testEmpty").makeQualified(fs);
-    fs.delete(root, true);
-    ObjectInspector inspector;
-    synchronized (TestOrcFile.class) {
-      inspector = ObjectInspectorFactory.getReflectionObjectInspector
-          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
-    }
-
-    // write the empty base
-    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
-        .inspector(inspector).bucket(BUCKET).writingBase(true)
-        .maximumTransactionId(100).finalDestination(root);
-    of.getRecordUpdater(root, options).close(false);
-
-    ValidTxnList txnList = new ValidReadTxnList("200:");
-    AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
-
-    Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
-        BUCKET);
-    Reader baseReader = OrcFile.createReader(basePath,
-        OrcFile.readerOptions(conf));
-    OrcRawRecordMerger merger =
-        new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
-            createMaximalTxnList(), new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()));
-    RecordIdentifier key = merger.createKey();
-    OrcStruct value = merger.createValue();
-    assertEquals(false, merger.next(key, value));
-  }
-
-  /**
-   * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
-   * a base and a delta.
-   * @throws Exception
-   */
-  @Test
-  public void testNewBaseAndDelta() throws Exception {
-    testNewBaseAndDelta(false);
-    testNewBaseAndDelta(true);
-  }
-  private void testNewBaseAndDelta(boolean use130Format) throws Exception {
-    final int BUCKET = 10;
-    String[] values = new String[]{"first", "second", "third", "fourth",
-                                   "fifth", "sixth", "seventh", "eighth",
-                                   "ninth", "tenth"};
-    Configuration conf = new Configuration();
-    OrcOutputFormat of = new OrcOutputFormat();
-    FileSystem fs = FileSystem.getLocal(conf);
-    Path root = new Path(tmpDir, "testNewBaseAndDelta").makeQualified(fs);
-    fs.delete(root, true);
-    ObjectInspector inspector;
-    synchronized (TestOrcFile.class) {
-      inspector = ObjectInspectorFactory.getReflectionObjectInspector
-          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
-    }
-
-    // write the base
-    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
-        .inspector(inspector).bucket(BUCKET).finalDestination(root);
-    if(!use130Format) {
-      options.statementId(-1);
-    }
-    RecordUpdater ru = of.getRecordUpdater(root,
-        options.writingBase(true).maximumTransactionId(100));
-    for(String v: values) {
-      ru.insert(0, new MyRow(v));
-    }
-    ru.close(false);
-
-    // write a delta
-    ru = of.getRecordUpdater(root, options.writingBase(false)
-        .minimumTransactionId(200).maximumTransactionId(200).recordIdColumn(1));
-    ru.update(200, new MyRow("update 1", 0, 0, BUCKET));
-    ru.update(200, new MyRow("update 2", 2, 0, BUCKET));
-    ru.update(200, new MyRow("update 3", 3, 0, BUCKET));
-    ru.delete(200, new MyRow("", 7, 0, BUCKET));
-    ru.delete(200, new MyRow("", 8, 0, BUCKET));
-    ru.close(false);
-
-    ValidTxnList txnList = new ValidReadTxnList("200:");
-    AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
-
-    assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
-    assertEquals(new Path(root, use130Format ?
-        AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
-        directory.getCurrentDirectories().get(0).getPath());
-
-    Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
-        BUCKET);
-    Reader baseReader = OrcFile.createReader(basePath,
-        OrcFile.readerOptions(conf));
-    OrcRawRecordMerger merger =
-        new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
-            createMaximalTxnList(), new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()));
-    assertEquals(null, merger.getMinKey());
-    assertEquals(null, merger.getMaxKey());
-    RecordIdentifier id = merger.createKey();
-    OrcStruct event = merger.createValue();
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
-    assertEquals("update 1", getValue(event));
-    assertFalse(merger.isDelete(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
-    assertEquals("second", getValue(event));
-    assertFalse(merger.isDelete(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 2, 200), id);
-    assertEquals("update 2", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 3, 200), id);
-    assertEquals("update 3", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 4, 0), id);
-    assertEquals("fifth", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 5, 0), id);
-    assertEquals("sixth", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 6, 0), id);
-    assertEquals("seventh", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
-    assertNull(OrcRecordUpdater.getRow(event));
-    assertTrue(merger.isDelete(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
-    assertNull(OrcRecordUpdater.getRow(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 9, 0), id);
-    assertEquals("tenth", getValue(event));
-
-    assertEquals(false, merger.next(id, event));
-    merger.close();
-
-    // make a merger that doesn't collapse events
-    merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
-            createMaximalTxnList(), new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
-    assertEquals("update 1", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 0, 0), id);
-    assertEquals("first", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
-    assertEquals("second", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 2, 200), id);
-    assertEquals("update 2", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 2, 0), id);
-    assertEquals("third", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 3, 200), id);
-    assertEquals("update 3", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 3, 0), id);
-    assertEquals("fourth", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 4, 0), id);
-    assertEquals("fifth", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 5, 0), id);
-    assertEquals("sixth", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 6, 0), id);
-    assertEquals("seventh", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
-    assertNull(OrcRecordUpdater.getRow(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 7, 0), id);
-    assertEquals("eighth", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
-    assertNull(OrcRecordUpdater.getRow(event));
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 8, 0), id);
-    assertEquals("ninth", getValue(event));
-
-    assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 9, 0), id);
-    assertEquals("tenth", getValue(event));
-
-    assertEquals(false, merger.next(id, event));
-    merger.close();
-
-    // try ignoring the 200 transaction and make sure it works still
-    ValidTxnList txns = new ValidReadTxnList("2000:200");
-    merger =
-        new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
-            txns, new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()));
-    for(int i=0; i < values.length; ++i) {
-      assertEquals(true, merger.next(id, event));
-      LOG.info("id = " + id + "event = " + event);
-      assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-          OrcRecordUpdater.getOperation(event));
-      assertEquals(new ReaderKey(0, BUCKET, i, 0), id);
-      assertEquals(values[i], getValue(event));
-    }
-
-    assertEquals(false, merger.next(id, event));
-    merger.close();
-  }
-
-  static class BigRow {
-    int myint;
-    long mylong;
-    Text mytext;
-    float myfloat;
-    double mydouble;
-    RecordIdentifier ROW__ID;
-
-    BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble) {
-      this.myint = myint;
-      this.mylong = mylong;
-      this.mytext = new Text(mytext);
-      this.myfloat = myfloat;
-      this.mydouble = mydouble;
-      ROW__ID = null;
-    }
-
-    BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble,
-                    long rowId, long origTxn, int bucket) {
-      this.myint = myint;
-      this.mylong = mylong;
-      this.mytext = new Text(mytext);
-      this.myfloat = myfloat;
-      this.mydouble = mydouble;
-      ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
-    }
-
-    BigRow(long rowId, long origTxn, int bucket) {
-      ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
-    }
-  }
-
-  /**
-   * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
-   * a base and a delta.
-   * @throws Exception
-   */
-  @Test
-  public void testRecordReaderOldBaseAndDelta() throws Exception {
-    final int BUCKET = 10;
-    Configuration conf = new Configuration();
-    OrcOutputFormat of = new OrcOutputFormat();
-    FileSystem fs = FileSystem.getLocal(conf);
-    Path root = new Path(tmpDir, "testOldBaseAndDelta").makeQualified(fs);
-    fs.delete(root, true);
-    ObjectInspector inspector;
-    synchronized (TestOrcFile.class) {
-      inspector = ObjectInspectorFactory.getReflectionObjectInspector
-          (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
-    }
-
-    // write the base
-    MemoryManager mgr = new MemoryManager(conf){
-      int rowsAddedSinceCheck = 0;
-
-      @Override
-      synchronized void addedRow(int rows) throws IOException {
-        rowsAddedSinceCheck += rows;
-        if (rowsAddedSinceCheck >= 2) {
-          notifyWriters();
-          rowsAddedSinceCheck = 0;
-        }
-      }
-    };
-    // make 5 stripes with 2 rows each
-    Writer writer = OrcFile.createWriter(new Path(root, "0000010_0"),
-        OrcFile.writerOptions(conf).inspector(inspector).fileSystem(fs)
-        .blockPadding(false).bufferSize(10000).compress(CompressionKind.NONE)
-        .stripeSize(1).memory(mgr).version(OrcFile.Version.V_0_11));
-    String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
-       "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
-    for(int i=0; i < values.length; ++i) {
-      writer.addRow(new BigRow(i, i, values[i], i, i));
-    }
-    writer.close();
-
-    // write a delta
-    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
-        .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
-        .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root);
-    RecordUpdater ru = of.getRecordUpdater(root, options);
-    values = new String[]{"0.0", null, null, "1.1", null, null, null,
-        "ignore.7"};
-    for(int i=0; i < values.length; ++i) {
-      if (values[i] != null) {
-        ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
-      }
-    }
-    ru.delete(100, new BigRow(9, 0, BUCKET));
-    ru.close(false);
-
-    // write a delta
-    options = options.minimumTransactionId(2).maximumTransactionId(2);
-    ru = of.getRecordUpdater(root, options);
-    values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
-    for(int i=0; i < values.length; ++i) {
-      if (values[i] != null) {
-        ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
-      }
-    }
-    ru.delete(100, new BigRow(8, 0, BUCKET));
-    ru.close(false);
-
-    InputFormat inf = new OrcInputFormat();
-    JobConf job = new JobConf();
-    job.set("mapred.min.split.size", "1");
-    job.set("mapred.max.split.size", "2");
-    job.set("mapred.input.dir", root.toString());
-    InputSplit[] splits = inf.getSplits(job, 5);
-    assertEquals(5, splits.length);
-    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
-
-    // loop through the 5 splits and read each
-    for(int i=0; i < 4; ++i) {
-      System.out.println("starting split " + i);
-      rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
-      NullWritable key = rr.createKey();
-      OrcStruct value = rr.createValue();
-
-      // there should be exactly two rows per a split
-      for(int j=0; j < 2; ++j) {
-        System.out.println("i = " + i + ", j = " + j);
-        assertEquals(true, rr.next(key, value));
-        System.out.println("record = " + value);
-        assertEquals(i + "." + j, value.getFieldValue(2).toString());
-      }
-      assertEquals(false, rr.next(key, value));
-    }
-    rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
-    assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
-  }
-
-  /**
-   * Test the RecordReader when there is a new base and a delta.
-   * @throws Exception
-   */
-  @Test
-  public void testRecordReaderNewBaseAndDelta() throws Exception {
-    final int BUCKET = 11;
-    Configuration conf = new Configuration();
-    OrcOutputFormat of = new OrcOutputFormat();
-    FileSystem fs = FileSystem.getLocal(conf);
-    Path root = new Path(tmpDir, "testRecordReaderNewBaseAndDelta").makeQualified(fs);
-    fs.delete(root, true);
-    ObjectInspector inspector;
-    synchronized (TestOrcFile.class) {
-      inspector = ObjectInspectorFactory.getReflectionObjectInspector
-          (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
-    }
-
-    // write the base
-    MemoryManager mgr = new MemoryManager(conf){
-      int rowsAddedSinceCheck = 0;
-
-      @Override
-      synchronized void addedRow(int rows) throws IOException {
-        rowsAddedSinceCheck += rows;
-        if (rowsAddedSinceCheck >= 2) {
-          notifyWriters();
-          rowsAddedSinceCheck = 0;
-        }
-      }
-    };
-
-    // make 5 stripes with 2 rows each
-    OrcRecordUpdater.OrcOptions options = (OrcRecordUpdater.OrcOptions)
-        new OrcRecordUpdater.OrcOptions(conf)
-        .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
-        .bucket(BUCKET).inspector(inspector).filesystem(fs);
-    options.orcOptions(OrcFile.writerOptions(conf)
-      .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
-      .memory(mgr));
-    options.finalDestination(root);
-    RecordUpdater ru = of.getRecordUpdater(root, options);
-    String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
-        "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
-    for(int i=0; i < values.length; ++i) {
-      ru.insert(0, new BigRow(i, i, values[i], i, i));
-    }
-    ru.close(false);
-
-    // write a delta
-    options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1).recordIdColumn(5);
-    ru = of.getRecordUpdater(root, options);
-    values = new String[]{"0.0", null, null, "1.1", null, null, null,
-        "ignore.7"};
-    for(int i=0; i < values.length; ++i) {
-      if (values[i] != null) {
-        ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
-      }
-    }
-    ru.delete(100, new BigRow(9, 0, BUCKET));
-    ru.close(false);
-
-    // write a delta
-    options.minimumTransactionId(2).maximumTransactionId(2);
-    ru = of.getRecordUpdater(root, options);
-    values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
-    for(int i=0; i < values.length; ++i) {
-      if (values[i] != null) {
-        ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
-      }
-    }
-    ru.delete(100, new BigRow(8, 0, BUCKET));
-    ru.close(false);
-
-    InputFormat inf = new OrcInputFormat();
-    JobConf job = new JobConf();
-    job.set("mapred.min.split.size", "1");
-    job.set("mapred.max.split.size", "2");
-    job.set("mapred.input.dir", root.toString());
-    InputSplit[] splits = inf.getSplits(job, 5);
-    assertEquals(5, splits.length);
-    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
-
-    // loop through the 5 splits and read each
-    for(int i=0; i < 4; ++i) {
-      System.out.println("starting split " + i);
-      rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
-      NullWritable key = rr.createKey();
-      OrcStruct value = rr.createValue();
-
-      // there should be exactly two rows per a split
-      for(int j=0; j < 2; ++j) {
-        System.out.println("i = " + i + ", j = " + j);
-        assertEquals(true, rr.next(key, value));
-        System.out.println("record = " + value);
-        assertEquals(i + "." + j, value.getFieldValue(2).toString());
-      }
-      assertEquals(false, rr.next(key, value));
-    }
-    rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
-    assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
-  }
-
-  /**
-   * Test the RecordReader when there is a new base and a delta.
-   * @throws Exception
-   */
-  @Test
-  public void testRecordReaderDelta() throws Exception {
-    final int BUCKET = 0;
-    Configuration conf = new Configuration();
-    OrcOutputFormat of = new OrcOutputFormat();
-    FileSystem fs = FileSystem.getLocal(conf);
-    Path root = new Path(tmpDir, "testRecordReaderDelta").makeQualified(fs);
-    fs.delete(root, true);
-    ObjectInspector inspector;
-    synchronized (TestOrcFile.class) {
-      inspector = ObjectInspectorFactory.getReflectionObjectInspector
-          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
-    }
-
-    // write a delta
-    AcidOutputFormat.Options options =
-        new AcidOutputFormat.Options(conf)
-            .bucket(BUCKET).inspector(inspector).filesystem(fs)
-            .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
-          .finalDestination(root);
-    RecordUpdater ru = of.getRecordUpdater(root, options);
-    String[] values = new String[]{"a", "b", "c", "d", "e"};
-    for(int i=0; i < values.length; ++i) {
-      ru.insert(1, new MyRow(values[i]));
-    }
-    ru.close(false);
-
-    // write a delta
-    options.minimumTransactionId(2).maximumTransactionId(2);
-    ru = of.getRecordUpdater(root, options);
-    values = new String[]{"f", "g", "h", "i", "j"};
-    for(int i=0; i < values.length; ++i) {
-      ru.insert(2, new MyRow(values[i]));
-    }
-    ru.close(false);
-
-    InputFormat inf = new OrcInputFormat();
-    JobConf job = new JobConf();
-    job.set("mapred.min.split.size", "1");
-    job.set("mapred.max.split.size", "2");
-    job.set("mapred.input.dir", root.toString());
-    job.set("bucket_count", "1");
-    InputSplit[] splits = inf.getSplits(job, 5);
-    assertEquals(1, splits.length);
-    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
-    rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
-    values = new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"};
-    OrcStruct row = rr.createValue();
-    for(int i = 0; i < values.length; ++i) {
-      System.out.println("Checking " + i);
-      assertEquals(true, rr.next(NullWritable.get(), row));
-      assertEquals(values[i], row.getFieldValue(0).toString());
-    }
-    assertEquals(false, rr.next(NullWritable.get(), row));
-  }
-
-  /**
-   * Test the RecordReader when the delta has been flushed, but not closed.
-   * @throws Exception
-   */
-  @Test
-  public void testRecordReaderIncompleteDelta() throws Exception {
-    testRecordReaderIncompleteDelta(false);
-    testRecordReaderIncompleteDelta(true);
-  }
-  /**
-   * 
-   * @param use130Format true means use delta_0001_0001_0000 format, else delta_0001_00001
-   */
-  private void testRecordReaderIncompleteDelta(boolean use130Format) throws Exception {
-    final int BUCKET = 1;
-    Configuration conf = new Configuration();
-    OrcOutputFormat of = new OrcOutputFormat();
-    FileSystem fs = FileSystem.getLocal(conf).getRaw();
-    Path root = new Path(tmpDir, "testRecordReaderIncompleteDelta").makeQualified(fs);
-    fs.delete(root, true);
-    ObjectInspector inspector;
-    synchronized (TestOrcFile.class) {
-      inspector = ObjectInspectorFactory.getReflectionObjectInspector
-          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
-    }
-
-    // write a base
-    AcidOutputFormat.Options options =
-        new AcidOutputFormat.Options(conf)
-            .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
-            .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root);
-    if(!use130Format) {
-      options.statementId(-1);
-    }
-    RecordUpdater ru = of.getRecordUpdater(root, options);
-    String[] values= new String[]{"1", "2", "3", "4", "5"};
-    for(int i=0; i < values.length; ++i) {
-      ru.insert(0, new MyRow(values[i]));
-    }
-    ru.close(false);
-
-    // write a delta
-    options.writingBase(false).minimumTransactionId(10)
-        .maximumTransactionId(19);
-    ru = of.getRecordUpdater(root, options);
-    values = new String[]{"6", "7", "8"};
-    for(int i=0; i < values.length; ++i) {
-      ru.insert(1, new MyRow(values[i]));
-    }
-    InputFormat inf = new OrcInputFormat();
-    JobConf job = new JobConf();
-    job.set("mapred.input.dir", root.toString());
-    job.set("bucket_count", "2");
-
-    // read the keys before the delta is flushed
-    InputSplit[] splits = inf.getSplits(job, 1);
-    assertEquals(2, splits.length);
-    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
-        inf.getRecordReader(splits[0], job, Reporter.NULL);
-    NullWritable key = rr.createKey();
-    OrcStruct value = rr.createValue();
-    System.out.println("Looking at split " + splits[0]);
-    for(int i=1; i < 6; ++i) {
-      System.out.println("Checking row " + i);
-      assertEquals(true, rr.next(key, value));
-      assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
-    }
-    assertEquals(false, rr.next(key, value));
-
-    ru.flush();
-    ru.flush();
-    values = new String[]{"9", "10"};
-    for(int i=0; i < values.length; ++i) {
-      ru.insert(3, new MyRow(values[i]));
-    }
-    ru.flush();
-
-    splits = inf.getSplits(job, 1);
-    assertEquals(2, splits.length);
-    rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
-    Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) :
-      AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length");
-    assertEquals(true, fs.exists(sideFile));
-    assertEquals(24, fs.getFileStatus(sideFile).getLen());
-
-    for(int i=1; i < 11; ++i) {
-      assertEquals(true, rr.next(key, value));
-      assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
-    }
-    assertEquals(false, rr.next(key, value));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 5b775f9..0a91348 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -241,6 +241,34 @@ public class TestDbTxnManager2 {
     otherTxnMgr.closeTxnManager();
   }
 
+  /**
+   * check that locks in Waiting state show what they are waiting on
+   * This test is somewhat abusive in that it make DbLockManager retain locks for 2
+   * different queries (which are not part of the same transaction) which can never
+   * happen in real use cases... but it makes testing convenient.
+   * @throws Exception
+   */
+  @Test
+  public void testLockBlockedBy() throws Exception {
+    CommandProcessorResponse cpr = driver.run("create table TAB_BLOCKED (a int, b int) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    cpr = driver.compileAndRespond("select * from TAB_BLOCKED");
+    checkCmdOnDriver(cpr);
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "I AM SAM");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks.get(0));
+    cpr = driver.compileAndRespond("drop table TAB_BLOCKED");
+    checkCmdOnDriver(cpr);
+    ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false);//make non-blocking
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks.get(0));
+    checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "TAB_BLOCKED", null, locks.get(1));
+    Assert.assertEquals("BlockedByExtId doesn't match", locks.get(0).getLockid(), locks.get(1).getBlockedByExtId());
+    Assert.assertEquals("BlockedByIntId doesn't match", locks.get(0).getLockIdInternal(), locks.get(1).getBlockedByIntId());
+  }
+
   @Test
   public void testDummyTxnManagerOnAcidTable() throws Exception {
     // Create an ACID table with DbTxnManager

http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
index d9d2ed6..46d8ea1 100644
--- a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
+++ b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
@@ -2,17 +2,17 @@ PREHOOK: query: show locks
 PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks
 POSTHOOK: type: SHOWLOCKS
-Lock ID	Database	Table	Partition	State	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname
+Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User
 PREHOOK: query: show locks extended
 PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks extended
 POSTHOOK: type: SHOWLOCKS
-Lock ID	Database	Table	Partition	State	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname
+Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User
 PREHOOK: query: show locks default
 PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks default
 POSTHOOK: type: SHOWLOCKS
-Lock ID	Database	Table	Partition	State	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname
+Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User
 PREHOOK: query: show transactions
 PREHOOK: type: SHOW TRANSACTIONS
 POSTHOOK: query: show transactions


[2/2] hive git commit: HIVE-10249 ACID: show locks should show who the lock is waiting for (Eugene Koifman, reviewed by Wei Zheng)

Posted by ek...@apache.org.
HIVE-10249 ACID: show locks should show who the lock is waiting for (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b6f6c4ac
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b6f6c4ac
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b6f6c4ac

Branch: refs/heads/branch-1
Commit: b6f6c4acba3b863851d197a7e6e2e9f4b851da70
Parents: a275951
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Wed Mar 30 12:18:20 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Wed Mar 30 12:18:20 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/metastore/txn/TxnDbUtil.java    |    6 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |   46 +-
 .../hive/metastore/txn/TestTxnHandler.java      |    2 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   16 +-
 .../exec/vector/VectorizedBatchUtil.java.orig   |  707 -----------
 .../ql/parse/LoadSemanticAnalyzer.java.orig     |  360 ------
 .../ql/io/orc/TestOrcRawRecordMerger.java.orig  | 1150 ------------------
 .../hive/ql/lockmgr/TestDbTxnManager2.java      |   28 +
 .../clientpositive/dbtxnmgr_showlocks.q.out     |    6 +-
 9 files changed, 89 insertions(+), 2232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 56c9ed8..2e24678 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -103,7 +103,11 @@ public final class TxnDbUtil {
           " HL_ACQUIRED_AT bigint," +
           " HL_USER varchar(128) NOT NULL," +
           " HL_HOST varchar(128) NOT NULL," +
-          " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
+          " HL_HEARTBEAT_COUNT integer," +
+          " HL_AGENT_INFO varchar(128)," +
+          " HL_BLOCKEDBY_EXT_ID bigint," +
+          " HL_BLOCKEDBY_INT_ID bigint," +
+        " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
       stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
 
       stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)");

http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index a3b0751..ed4a3c2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -869,8 +869,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
    */
   private static class LockInfoExt extends LockInfo {
     private final ShowLocksResponseElement e;
-    LockInfoExt(ShowLocksResponseElement e, long intLockId) {
-      super(e, intLockId);
+    LockInfoExt(ShowLocksResponseElement e) {
+      super(e);
       this.e = e;
     }
   }
@@ -886,7 +886,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         stmt = dbConn.createStatement();
 
         String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
-          "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id from HIVE_LOCKS";
+          "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," +
+          "hl_blockedby_ext_id, hl_blockedby_int_id from HIVE_LOCKS";
         LOG.debug("Doing to execute query <" + s + ">");
         ResultSet rs = stmt.executeQuery(s);
         while (rs.next()) {
@@ -914,7 +915,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
           e.setUser(rs.getString(10));
           e.setHostname(rs.getString(11));
-          sortedList.add(new LockInfoExt(e, rs.getLong(12)));
+          e.setLockIdInternal(rs.getLong(12));
+          long id = rs.getLong(13);
+          if(!rs.wasNull()) {
+            e.setBlockedByExtId(id);
+          }
+          id = rs.getLong(14);
+          if(!rs.wasNull()) {
+            e.setBlockedByIntId(id);
+          }
+          sortedList.add(new LockInfoExt(e));
         }
         LOG.debug("Going to rollback");
         dbConn.rollback();
@@ -1164,6 +1174,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   private static void shouldNeverHappen(long txnid) {
     throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid));
   }
+  private static void shouldNeverHappen(long txnid, long extLockId, long intLockId) {
+    throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " "
+      + JavaUtils.lockIdToString(extLockId) + " " + intLockId);
+  }
   public void addDynamicPartitions(AddDynamicPartitions rqst)
       throws NoSuchTxnException,  TxnAbortedException, MetaException {
     Connection dbConn = null;
@@ -1711,15 +1725,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       }
       txnId = rs.getLong("hl_txnid");//returns 0 if value is NULL
     }
-    LockInfo(ShowLocksResponseElement e, long intLockId) {
+    LockInfo(ShowLocksResponseElement e) {
       extLockId = e.getLockid();
-      this.intLockId = intLockId;
+      intLockId = e.getLockIdInternal();
+      txnId = e.getTxnid();
       db = e.getDbname();
       table = e.getTablename();
       partition = e.getPartname();
       state = e.getState();
       type = e.getType();
-      txnId = e.getTxnid();
     }
 
     public boolean equals(Object other) {
@@ -2018,9 +2032,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
     LOG.debug("Going to execute query <" + query.toString() + ">");
     Statement stmt = null;
+    ResultSet rs = null;
     try {
       stmt = dbConn.createStatement();
-      ResultSet rs = stmt.executeQuery(query.toString());
+      rs = stmt.executeQuery(query.toString());
       SortedSet<LockInfo> lockSet = new TreeSet<LockInfo>(new LockInfoComparator());
       while (rs.next()) {
         lockSet.add(new LockInfo(rs));
@@ -2091,7 +2106,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           switch (lockAction) {
             case WAIT:
               if(!ignoreConflict(info, locks[i])) {
+                /*we acquire all locks for a given query atomically; if 1 blocks, all go into (remain) in
+                * Waiting state.  wait() will undo any 'acquire()' which may have happened as part of
+                * this (metastore db) transaction and then we record which lock blocked the lock
+                * we were testing ('info').*/
                 wait(dbConn, save);
+                String sqlText = "update HIVE_LOCKS" +
+                  " set HL_BLOCKEDBY_EXT_ID=" + locks[i].extLockId +
+                  ", HL_BLOCKEDBY_INT_ID=" + locks[i].intLockId +
+                  " where HL_LOCK_EXT_ID=" + info.extLockId + " and HL_LOCK_INT_ID=" + info.intLockId;
+                LOG.debug("Executing sql: " + sqlText);
+                int updCnt = stmt.executeUpdate(sqlText);
+                if(updCnt != 1) {
+                  shouldNeverHappen(info.txnId, info.extLockId, info.intLockId);
+                }
                 LOG.debug("Going to commit");
                 dbConn.commit();
                 response.setState(LockState.WAITING);
@@ -2120,7 +2148,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       dbConn.commit();
       response.setState(LockState.ACQUIRED);
     } finally {
-      closeStmt(stmt);
+      close(rs, stmt, null);
     }
     return response;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 6033c15..4d3c3e1 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -1114,7 +1114,7 @@ public class TestTxnHandler {
   }
 
   @Test
-  @Ignore
+  @Ignore("Wedges Derby")
   public void deadlockDetected() throws Exception {
     LOG.debug("Starting deadlock test");
     if (txnHandler instanceof TxnHandler) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 414293c..816d8d4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2512,6 +2512,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     os.write(separator);
     os.writeBytes("State");
     os.write(separator);
+    os.writeBytes("Blocked By");
+    os.write(separator);
     os.writeBytes("Type");
     os.write(separator);
     os.writeBytes("Transaction ID");
@@ -2528,7 +2530,12 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     List<ShowLocksResponseElement> locks = rsp.getLocks();
     if (locks != null) {
       for (ShowLocksResponseElement lock : locks) {
-        os.writeBytes(Long.toString(lock.getLockid()));
+        if(lock.isSetLockIdInternal()) {
+          os.writeBytes(Long.toString(lock.getLockid()) + "." + Long.toString(lock.getLockIdInternal()));
+        }
+        else {
+          os.writeBytes(Long.toString(lock.getLockid()));
+        }
         os.write(separator);
         os.writeBytes(lock.getDbname());
         os.write(separator);
@@ -2538,6 +2545,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
         os.write(separator);
         os.writeBytes(lock.getState().toString());
         os.write(separator);
+        if(lock.isSetBlockedByExtId()) {//both "blockedby" are either there or not
+          os.writeBytes(Long.toString(lock.getBlockedByExtId()) + "." + Long.toString(lock.getBlockedByIntId()));
+        }
+        else {
+          os.writeBytes("            ");//12 chars - try to keep cols aligned
+        }
+        os.write(separator);
         os.writeBytes(lock.getType().toString());
         os.write(separator);
         os.writeBytes((lock.getTxnid() == 0) ? "NULL" : Long.toString(lock.getTxnid()));

http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java.orig
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java.orig b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java.orig
deleted file mode 100644
index af43a07..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java.orig
+++ /dev/null
@@ -1,707 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.vector;
-
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
-import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
-import org.apache.hadoop.hive.common.type.HiveVarchar;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
-import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
-import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hive.common.util.DateUtils;
-
-public class VectorizedBatchUtil {
-  private static final Log LOG = LogFactory.getLog(VectorizedBatchUtil.class);
-
-  /**
-   * Sets the IsNull value for ColumnVector at specified index
-   * @param cv
-   * @param rowIndex
-   */
-  public static void setNullColIsNullValue(ColumnVector cv, int rowIndex) {
-    cv.isNull[rowIndex] = true;
-    if (cv.noNulls) {
-      cv.noNulls = false;
-    }
-  }
-
-  /**
-   * Iterates thru all the column vectors and sets noNull to
-   * specified value.
-   *
-   * @param batch
-   *          Batch on which noNull is set
-   */
-  public static void setNoNullFields(VectorizedRowBatch batch) {
-    for (int i = 0; i < batch.numCols; i++) {
-      batch.cols[i].noNulls = true;
-    }
-  }
-
-  /**
-   * Iterates thru all the column vectors and sets repeating to
-   * specified column.
-   *
-   */
-  public static void setRepeatingColumn(VectorizedRowBatch batch, int column) {
-    ColumnVector cv = batch.cols[column];
-    cv.isRepeating = true;
-  }
-
-  /**
-   * Reduce the batch size for a vectorized row batch
-   */
-  public static void setBatchSize(VectorizedRowBatch batch, int size) {
-    assert (size <= batch.getMaxSize());
-    batch.size = size;
-  }
-
-  /**
-   * Walk through the object inspector and add column vectors
-   *
-   * @param oi
-   * @param cvList
-   *          ColumnVectors are populated in this list
-   */
-  private static void allocateColumnVector(StructObjectInspector oi,
-      List<ColumnVector> cvList) throws HiveException {
-    if (cvList == null) {
-      throw new HiveException("Null columnvector list");
-    }
-    if (oi == null) {
-      return;
-    }
-    final List<? extends StructField> fields = oi.getAllStructFieldRefs();
-    for(StructField field : fields) {
-      ObjectInspector fieldObjectInspector = field.getFieldObjectInspector();
-      switch(fieldObjectInspector.getCategory()) {
-      case PRIMITIVE:
-        PrimitiveObjectInspector poi = (PrimitiveObjectInspector) fieldObjectInspector;
-        switch(poi.getPrimitiveCategory()) {
-        case BOOLEAN:
-        case BYTE:
-        case SHORT:
-        case INT:
-        case LONG:
-        case TIMESTAMP:
-        case DATE:
-        case INTERVAL_YEAR_MONTH:
-        case INTERVAL_DAY_TIME:
-          cvList.add(new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
-          break;
-        case FLOAT:
-        case DOUBLE:
-          cvList.add(new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
-          break;
-        case BINARY:
-        case STRING:
-        case CHAR:
-        case VARCHAR:
-          cvList.add(new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
-          break;
-        case DECIMAL:
-          DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo();
-          cvList.add(new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
-              tInfo.precision(), tInfo.scale()));
-          break;
-        default:
-          throw new HiveException("Vectorizaton is not supported for datatype:"
-              + poi.getPrimitiveCategory());
-        }
-        break;
-      case STRUCT:
-        throw new HiveException("Struct not supported");
-      default:
-        throw new HiveException("Flattening is not supported for datatype:"
-            + fieldObjectInspector.getCategory());
-      }
-    }
-  }
-
-
-  /**
-   * Create VectorizedRowBatch from ObjectInspector
-   *
-   * @param oi
-   * @return
-   * @throws HiveException
-   */
-  public static VectorizedRowBatch constructVectorizedRowBatch(
-      StructObjectInspector oi) throws HiveException {
-    final List<ColumnVector> cvList = new LinkedList<ColumnVector>();
-    allocateColumnVector(oi, cvList);
-    final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size());
-    int i = 0;
-    for(ColumnVector cv : cvList) {
-      result.cols[i++] = cv;
-    }
-    return result;
-  }
-
-  /**
-   * Create VectorizedRowBatch from key and value object inspectors
-   * The row object inspector used by ReduceWork needs to be a **standard**
-   * struct object inspector, not just any struct object inspector.
-   * @param keyInspector
-   * @param valueInspector
-   * @param vectorScratchColumnTypeMap
-   * @return VectorizedRowBatch, OI
-   * @throws HiveException
-   */
-  public static ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> constructVectorizedRowBatch(
-      StructObjectInspector keyInspector, StructObjectInspector valueInspector, Map<Integer, String> vectorScratchColumnTypeMap)
-          throws HiveException {
-
-    ArrayList<String> colNames = new ArrayList<String>();
-    ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
-    List<? extends StructField> fields = keyInspector.getAllStructFieldRefs();
-    for (StructField field: fields) {
-      colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
-      ois.add(field.getFieldObjectInspector());
-    }
-    fields = valueInspector.getAllStructFieldRefs();
-    for (StructField field: fields) {
-      colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
-      ois.add(field.getFieldObjectInspector());
-    }
-    StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
-
-    VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx();
-    batchContext.init(vectorScratchColumnTypeMap, rowObjectInspector);
-    return new ObjectPair<>(batchContext.createVectorizedRowBatch(), rowObjectInspector);
-  }
-
-  /**
-   * Iterates through all columns in a given row and populates the batch
-   *
-   * @param row
-   * @param oi
-   * @param rowIndex
-   * @param batch
-   * @param buffer
-   * @throws HiveException
-   */
-  public static void addRowToBatch(Object row, StructObjectInspector oi,
-          int rowIndex,
-          VectorizedRowBatch batch,
-          DataOutputBuffer buffer
-          ) throws HiveException {
-    addRowToBatchFrom(row, oi, rowIndex, 0, batch, buffer);
-  }
-
-  /**
-   * Iterates thru all the columns in a given row and populates the batch
-   * from a given offset
-   *
-   * @param row Deserialized row object
-   * @param oi Object insepector for that row
-   * @param rowIndex index to which the row should be added to batch
-   * @param colOffset offset from where the column begins
-   * @param batch Vectorized batch to which the row is added at rowIndex
-   * @throws HiveException
-   */
-  public static void addRowToBatchFrom(Object row, StructObjectInspector oi,
-                                   int rowIndex,
-                                   int colOffset,
-                                   VectorizedRowBatch batch,
-                                   DataOutputBuffer buffer
-                                   ) throws HiveException {
-    List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
-    final int off = colOffset;
-    // Iterate thru the cols and load the batch
-    for (int i = 0; i < fieldRefs.size(); i++) {
-      setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, i, off);
-    }
-  }
-
-  /**
-   * Add only the projected column of a regular row to the specified vectorized row batch
-   * @param row the regular row
-   * @param oi object inspector for the row
-   * @param rowIndex the offset to add in the batch
-   * @param batch vectorized row batch
-   * @param buffer data output buffer
-   * @throws HiveException
-   */
-  public static void addProjectedRowToBatchFrom(Object row, StructObjectInspector oi,
-      int rowIndex, VectorizedRowBatch batch, DataOutputBuffer buffer) throws HiveException {
-    List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
-    for (int i = 0; i < fieldRefs.size(); i++) {
-      int projectedOutputCol = batch.projectedColumns[i];
-      if (batch.cols[projectedOutputCol] == null) {
-        continue;
-      }
-      setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, projectedOutputCol, 0);
-    }
-  }
-  /**
-   * Iterates thru all the columns in a given row and populates the batch
-   * from a given offset
-   *
-   * @param row Deserialized row object
-   * @param oi Object insepector for that row
-   * @param rowIndex index to which the row should be added to batch
-   * @param batch Vectorized batch to which the row is added at rowIndex
-   * @param context context object for this vectorized batch
-   * @param buffer
-   * @throws HiveException
-   */
-  public static void acidAddRowToBatch(Object row,
-                                       StructObjectInspector oi,
-                                       int rowIndex,
-                                       VectorizedRowBatch batch,
-                                       VectorizedRowBatchCtx context,
-                                       DataOutputBuffer buffer) throws HiveException {
-    List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
-    // Iterate thru the cols and load the batch
-    for (int i = 0; i < fieldRefs.size(); i++) {
-      if (batch.cols[i] == null) {
-        // This means the column was not included in the projection from the underlying read
-        continue;
-      }
-      if (context.isPartitionCol(i)) {
-        // The value will have already been set before we're called, so don't overwrite it
-        continue;
-      }
-      setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, i, 0);
-    }
-  }
-
-  private static void setVector(Object row,
-                                StructObjectInspector oi,
-                                StructField field,
-                                VectorizedRowBatch batch,
-                                DataOutputBuffer buffer,
-                                int rowIndex,
-                                int colIndex,
-                                int offset) throws HiveException {
-
-    Object fieldData = oi.getStructFieldData(row, field);
-    ObjectInspector foi = field.getFieldObjectInspector();
-
-    // Vectorization only supports PRIMITIVE data types. Assert the same
-    assert (foi.getCategory() == Category.PRIMITIVE);
-
-    // Get writable object
-    PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
-    Object writableCol = poi.getPrimitiveWritableObject(fieldData);
-
-    // NOTE: The default value for null fields in vectorization is 1 for int types, NaN for
-    // float/double. String types have no default value for null.
-    switch (poi.getPrimitiveCategory()) {
-    case BOOLEAN: {
-      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        lcv.vector[rowIndex] = ((BooleanWritable) writableCol).get() ? 1 : 0;
-        lcv.isNull[rowIndex] = false;
-      } else {
-        lcv.vector[rowIndex] = 1;
-        setNullColIsNullValue(lcv, rowIndex);
-      }
-    }
-      break;
-    case BYTE: {
-      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        lcv.vector[rowIndex] = ((ByteWritable) writableCol).get();
-        lcv.isNull[rowIndex] = false;
-      } else {
-        lcv.vector[rowIndex] = 1;
-        setNullColIsNullValue(lcv, rowIndex);
-      }
-    }
-      break;
-    case SHORT: {
-      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        lcv.vector[rowIndex] = ((ShortWritable) writableCol).get();
-        lcv.isNull[rowIndex] = false;
-      } else {
-        lcv.vector[rowIndex] = 1;
-        setNullColIsNullValue(lcv, rowIndex);
-      }
-    }
-      break;
-    case INT: {
-      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        lcv.vector[rowIndex] = ((IntWritable) writableCol).get();
-        lcv.isNull[rowIndex] = false;
-      } else {
-        lcv.vector[rowIndex] = 1;
-        setNullColIsNullValue(lcv, rowIndex);
-      }
-    }
-      break;
-    case LONG: {
-      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        lcv.vector[rowIndex] = ((LongWritable) writableCol).get();
-        lcv.isNull[rowIndex] = false;
-      } else {
-        lcv.vector[rowIndex] = 1;
-        setNullColIsNullValue(lcv, rowIndex);
-      }
-    }
-      break;
-    case DATE: {
-      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        lcv.vector[rowIndex] = ((DateWritable) writableCol).getDays();
-        lcv.isNull[rowIndex] = false;
-      } else {
-        lcv.vector[rowIndex] = 1;
-        setNullColIsNullValue(lcv, rowIndex);
-      }
-    }
-      break;
-    case FLOAT: {
-      DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        dcv.vector[rowIndex] = ((FloatWritable) writableCol).get();
-        dcv.isNull[rowIndex] = false;
-      } else {
-        dcv.vector[rowIndex] = Double.NaN;
-        setNullColIsNullValue(dcv, rowIndex);
-      }
-    }
-      break;
-    case DOUBLE: {
-      DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get();
-        dcv.isNull[rowIndex] = false;
-      } else {
-        dcv.vector[rowIndex] = Double.NaN;
-        setNullColIsNullValue(dcv, rowIndex);
-      }
-    }
-      break;
-    case TIMESTAMP: {
-      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        Timestamp t = ((TimestampWritable) writableCol).getTimestamp();
-        lcv.vector[rowIndex] = TimestampUtils.getTimeNanoSec(t);
-        lcv.isNull[rowIndex] = false;
-      } else {
-        lcv.vector[rowIndex] = 1;
-        setNullColIsNullValue(lcv, rowIndex);
-      }
-    }
-      break;
-    case INTERVAL_YEAR_MONTH: {
-      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        HiveIntervalYearMonth i = ((HiveIntervalYearMonthWritable) writableCol).getHiveIntervalYearMonth();
-        lcv.vector[rowIndex] = i.getTotalMonths();
-        lcv.isNull[rowIndex] = false;
-      } else {
-        lcv.vector[rowIndex] = 1;
-        setNullColIsNullValue(lcv, rowIndex);
-      }
-    }
-      break;
-    case INTERVAL_DAY_TIME: {
-      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        HiveIntervalDayTime i = ((HiveIntervalDayTimeWritable) writableCol).getHiveIntervalDayTime();
-        lcv.vector[rowIndex] = DateUtils.getIntervalDayTimeTotalNanos(i);
-        lcv.isNull[rowIndex] = false;
-      } else {
-        lcv.vector[rowIndex] = 1;
-        setNullColIsNullValue(lcv, rowIndex);
-      }
-    }
-      break;
-    case BINARY: {
-      BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-          bcv.isNull[rowIndex] = false;
-          BytesWritable bw = (BytesWritable) writableCol;
-          byte[] bytes = bw.getBytes();
-          int start = buffer.getLength();
-          int length = bw.getLength();
-          try {
-            buffer.write(bytes, 0, length);
-          } catch (IOException ioe) {
-            throw new IllegalStateException("bad write", ioe);
-          }
-          bcv.setRef(rowIndex, buffer.getData(), start, length);
-      } else {
-        setNullColIsNullValue(bcv, rowIndex);
-      }
-    }
-      break;
-    case STRING: {
-      BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        bcv.isNull[rowIndex] = false;
-        Text colText = (Text) writableCol;
-        int start = buffer.getLength();
-        int length = colText.getLength();
-        try {
-          buffer.write(colText.getBytes(), 0, length);
-        } catch (IOException ioe) {
-          throw new IllegalStateException("bad write", ioe);
-        }
-        bcv.setRef(rowIndex, buffer.getData(), start, length);
-      } else {
-        setNullColIsNullValue(bcv, rowIndex);
-      }
-    }
-      break;
-    case CHAR: {
-      BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        bcv.isNull[rowIndex] = false;
-        HiveChar colHiveChar = ((HiveCharWritable) writableCol).getHiveChar();
-        byte[] bytes = colHiveChar.getStrippedValue().getBytes();
-
-        // We assume the CHAR maximum length was enforced when the object was created.
-        int length = bytes.length;
-
-        int start = buffer.getLength();
-        try {
-          // In vector mode, we store CHAR as unpadded.
-          buffer.write(bytes, 0, length);
-        } catch (IOException ioe) {
-          throw new IllegalStateException("bad write", ioe);
-        }
-        bcv.setRef(rowIndex, buffer.getData(), start, length);
-      } else {
-        setNullColIsNullValue(bcv, rowIndex);
-      }
-    }
-      break;
-    case VARCHAR: {
-        BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
-        if (writableCol != null) {
-          bcv.isNull[rowIndex] = false;
-          HiveVarchar colHiveVarchar = ((HiveVarcharWritable) writableCol).getHiveVarchar();
-          byte[] bytes = colHiveVarchar.getValue().getBytes();
-
-          // We assume the VARCHAR maximum length was enforced when the object was created.
-          int length = bytes.length;
-
-          int start = buffer.getLength();
-          try {
-            buffer.write(bytes, 0, length);
-          } catch (IOException ioe) {
-            throw new IllegalStateException("bad write", ioe);
-          }
-          bcv.setRef(rowIndex, buffer.getData(), start, length);
-        } else {
-          setNullColIsNullValue(bcv, rowIndex);
-        }
-      }
-        break;
-    case DECIMAL:
-      DecimalColumnVector dcv = (DecimalColumnVector) batch.cols[offset + colIndex];
-      if (writableCol != null) {
-        dcv.isNull[rowIndex] = false;
-        HiveDecimalWritable wobj = (HiveDecimalWritable) writableCol;
-        dcv.set(rowIndex, wobj);
-      } else {
-        setNullColIsNullValue(dcv, rowIndex);
-      }
-      break;
-    default:
-      throw new HiveException("Vectorizaton is not supported for datatype:" +
-          poi.getPrimitiveCategory());
-    }
-  }
-
-  public static StandardStructObjectInspector convertToStandardStructObjectInspector(
-      StructObjectInspector structObjectInspector) throws HiveException {
-
-    List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
-    List<ObjectInspector> oids = new ArrayList<ObjectInspector>();
-    ArrayList<String> columnNames = new ArrayList<String>();
-
-    for(StructField field : fields) {
-      TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(
-          field.getFieldObjectInspector().getTypeName());
-      ObjectInspector standardWritableObjectInspector =
-              TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo);
-      oids.add(standardWritableObjectInspector);
-      columnNames.add(field.getFieldName());
-    }
-    return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,oids);
-  }
-
-  public static PrimitiveTypeInfo[] primitiveTypeInfosFromStructObjectInspector(
-      StructObjectInspector structObjectInspector) throws HiveException {
-
-    List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
-    PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[fields.size()];
-
-    int i = 0;
-    for(StructField field : fields) {
-      TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(
-          field.getFieldObjectInspector().getTypeName());
-      result[i++] =  (PrimitiveTypeInfo) typeInfo;
-    }
-    return result;
-  }
-
-  public static PrimitiveTypeInfo[] primitiveTypeInfosFromTypeNames(
-      String[] typeNames) throws HiveException {
-
-    PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[typeNames.length];
-
-    for(int i = 0; i < typeNames.length; i++) {
-      TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeNames[i]);
-      result[i] =  (PrimitiveTypeInfo) typeInfo;
-    }
-    return result;
-  }
-
-  /**
-   * Make a new (scratch) batch, which is exactly "like" the batch provided, except that it's empty
-   * @param batch the batch to imitate
-   * @return the new batch
-   * @throws HiveException
-   */
-  public static VectorizedRowBatch makeLike(VectorizedRowBatch batch) throws HiveException {
-    VectorizedRowBatch newBatch = new VectorizedRowBatch(batch.numCols);
-    for (int i = 0; i < batch.numCols; i++) {
-      ColumnVector colVector = batch.cols[i];
-      if (colVector != null) {
-        ColumnVector newColVector;
-        if (colVector instanceof LongColumnVector) {
-          newColVector = new LongColumnVector();
-        } else if (colVector instanceof DoubleColumnVector) {
-          newColVector = new DoubleColumnVector();
-        } else if (colVector instanceof BytesColumnVector) {
-          newColVector = new BytesColumnVector();
-        } else if (colVector instanceof DecimalColumnVector) {
-          DecimalColumnVector decColVector = (DecimalColumnVector) colVector;
-          newColVector = new DecimalColumnVector(decColVector.precision, decColVector.scale);
-        } else {
-          throw new HiveException("Column vector class " + colVector.getClass().getName() +
-          " is not supported!");
-        }
-        newBatch.cols[i] = newColVector;
-        newBatch.cols[i].init();
-      }
-    }
-    newBatch.projectedColumns = Arrays.copyOf(batch.projectedColumns, batch.projectedColumns.length);
-    newBatch.projectionSize = batch.projectionSize;
-    newBatch.reset();
-    return newBatch;
-  }
-
-  public static String displayBytes(byte[] bytes, int start, int length) {
-    StringBuilder sb = new StringBuilder();
-    for (int i = start; i < start + length; i++) {
-      char ch = (char) bytes[i];
-      if (ch < ' ' || ch > '~') {
-        sb.append(String.format("\\%03d", bytes[i] & 0xff));
-      } else {
-        sb.append(ch);
-      }
-    }
-    return sb.toString();
-  }
-
-  public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) {
-    StringBuilder sb = new StringBuilder();
-    sb.append(prefix + " row " + index + " ");
-    for (int column = 0; column < batch.cols.length; column++) {
-      ColumnVector colVector = batch.cols[column];
-      if (colVector == null) {
-        sb.append("(null colVector " + column + ")");
-      } else {
-        boolean isRepeating = colVector.isRepeating;
-        index = (isRepeating ? 0 : index);
-        if (colVector.noNulls || !colVector.isNull[index]) {
-          if (colVector instanceof LongColumnVector) {
-            sb.append(((LongColumnVector) colVector).vector[index]);
-          } else if (colVector instanceof DoubleColumnVector) {
-            sb.append(((DoubleColumnVector) colVector).vector[index]);
-          } else if (colVector instanceof BytesColumnVector) {
-            BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
-            byte[] bytes = bytesColumnVector.vector[index];
-            int start = bytesColumnVector.start[index];
-            int length = bytesColumnVector.length[index];
-            if (bytes == null) {
-              sb.append("(Unexpected null bytes with start " + start + " length " + length + ")");
-            } else {
-              sb.append("bytes: '" + displayBytes(bytes, start, length) + "'");
-            }
-          } else if (colVector instanceof DecimalColumnVector) {
-            sb.append(((DecimalColumnVector) colVector).vector[index].toString());
-          } else {
-            sb.append("Unknown");
-          }
-        } else {
-          sb.append("NULL");
-        }
-      }
-      sb.append(" ");
-    }
-    LOG.info(sb.toString());
-  }
-
-  public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) {
-    for (int i = 0; i < batch.size; i++) {
-      int index = (batch.selectedInUse ? batch.selected[i] : i);
-      debugDisplayOneRow(batch, index, prefix);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java.orig
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java.orig b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java.orig
deleted file mode 100644
index aacfa92..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java.orig
+++ /dev/null
@@ -1,360 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.parse;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.antlr.runtime.tree.Tree;
-import org.apache.commons.httpclient.util.URIUtil;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.FileFormatException;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
-import org.apache.hadoop.mapred.InputFormat;
-
-/**
- * LoadSemanticAnalyzer.
- *
- */
-public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
-
-  public LoadSemanticAnalyzer(HiveConf conf) throws SemanticException {
-    super(conf);
-  }
-
-  public static FileStatus[] matchFilesOrDir(FileSystem fs, Path path)
-      throws IOException {
-    FileStatus[] srcs = fs.globStatus(path, new PathFilter() {
-      @Override
-      public boolean accept(Path p) {
-        String name = p.getName();
-        return name.equals("_metadata") ? true : !name.startsWith("_") && !name.startsWith(".");
-      }
-    });
-    if ((srcs != null) && srcs.length == 1) {
-      if (srcs[0].isDir()) {
-        srcs = fs.listStatus(srcs[0].getPath(), new PathFilter() {
-          @Override
-          public boolean accept(Path p) {
-            String name = p.getName();
-            return !name.startsWith("_") && !name.startsWith(".");
-          }
-        });
-      }
-    }
-    return (srcs);
-  }
-
-  private URI initializeFromURI(String fromPath, boolean isLocal) throws IOException,
-      URISyntaxException {
-    URI fromURI = new Path(fromPath).toUri();
-
-    String fromScheme = fromURI.getScheme();
-    String fromAuthority = fromURI.getAuthority();
-    String path = fromURI.getPath();
-
-    // generate absolute path relative to current directory or hdfs home
-    // directory
-    if (!path.startsWith("/")) {
-      if (isLocal) {
-        path = URIUtil.decode(
-            new Path(System.getProperty("user.dir"), fromPath).toUri().toString());
-      } else {
-        path = new Path(new Path("/user/" + System.getProperty("user.name")),
-          path).toString();
-      }
-    }
-
-    // set correct scheme and authority
-    if (StringUtils.isEmpty(fromScheme)) {
-      if (isLocal) {
-        // file for local
-        fromScheme = "file";
-      } else {
-        // use default values from fs.default.name
-        URI defaultURI = FileSystem.get(conf).getUri();
-        fromScheme = defaultURI.getScheme();
-        fromAuthority = defaultURI.getAuthority();
-      }
-    }
-
-    // if scheme is specified but not authority then use the default authority
-    if ((!fromScheme.equals("file")) && StringUtils.isEmpty(fromAuthority)) {
-      URI defaultURI = FileSystem.get(conf).getUri();
-      fromAuthority = defaultURI.getAuthority();
-    }
-
-    LOG.debug(fromScheme + "@" + fromAuthority + "@" + path);
-    return new URI(fromScheme, fromAuthority, path, null, null);
-  }
-
-  private FileStatus[] applyConstraintsAndGetFiles(URI fromURI, URI toURI, Tree ast,
-      boolean isLocal) throws SemanticException {
-
-    FileStatus[] srcs = null;
-
-    // local mode implies that scheme should be "file"
-    // we can change this going forward
-    if (isLocal && !fromURI.getScheme().equals("file")) {
-      throw new SemanticException(ErrorMsg.ILLEGAL_PATH.getMsg(ast,
-          "Source file system should be \"file\" if \"local\" is specified"));
-    }
-
-    try {
-      srcs = matchFilesOrDir(FileSystem.get(fromURI, conf), new Path(fromURI));
-      if (srcs == null || srcs.length == 0) {
-        throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast,
-            "No files matching path " + fromURI));
-      }
-
-      for (FileStatus oneSrc : srcs) {
-        if (oneSrc.isDir()) {
-          throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast,
-              "source contains directory: " + oneSrc.getPath().toString()));
-        }
-      }
-    } catch (IOException e) {
-      // Has to use full name to make sure it does not conflict with
-      // org.apache.commons.lang.StringUtils
-      throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast), e);
-    }
-
-    return srcs;
-  }
-
-  @Override
-  public void analyzeInternal(ASTNode ast) throws SemanticException {
-    boolean isLocal = false;
-    boolean isOverWrite = false;
-    Tree fromTree = ast.getChild(0);
-    Tree tableTree = ast.getChild(1);
-
-    if (ast.getChildCount() == 4) {
-      isLocal = true;
-      isOverWrite = true;
-    }
-
-    if (ast.getChildCount() == 3) {
-      if (ast.getChild(2).getText().toLowerCase().equals("local")) {
-        isLocal = true;
-      } else {
-        isOverWrite = true;
-      }
-    }
-
-    // initialize load path
-    URI fromURI;
-    try {
-      String fromPath = stripQuotes(fromTree.getText());
-      fromURI = initializeFromURI(fromPath, isLocal);
-    } catch (IOException e) {
-      throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
-          .getMessage()), e);
-    } catch (URISyntaxException e) {
-      throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
-          .getMessage()), e);
-    }
-
-    // initialize destination table/partition
-    TableSpec ts = new TableSpec(db, conf, (ASTNode) tableTree);
-
-    if (ts.tableHandle.isOffline()){
-      throw new SemanticException(
-          ErrorMsg.OFFLINE_TABLE_OR_PARTITION.getMsg(":Table " + ts.tableName));
-    }
-
-    if (ts.tableHandle.isView()) {
-      throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
-    }
-    if (ts.tableHandle.isNonNative()) {
-      throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg());
-    }
-
-    if(ts.tableHandle.isStoredAsSubDirectories()) {
-      throw new SemanticException(ErrorMsg.LOAD_INTO_STORED_AS_DIR.getMsg());
-    }
-
-    URI toURI = ((ts.partHandle != null) ? ts.partHandle.getDataLocation()
-        : ts.tableHandle.getDataLocation()).toUri();
-
-    List<FieldSchema> parts = ts.tableHandle.getPartitionKeys();
-    if ((parts != null && parts.size() > 0)
-        && (ts.partSpec == null || ts.partSpec.size() == 0)) {
-      throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
-    }
-
-    // make sure the arguments make sense
-    FileStatus[] files = applyConstraintsAndGetFiles(fromURI, toURI, fromTree, isLocal);
-
-    // for managed tables, make sure the file formats match
-    if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType())) {
-      ensureFileFormatsMatch(ts, files);
-    }
-    inputs.add(toReadEntity(new Path(fromURI)));
-    Task<? extends Serializable> rTask = null;
-
-    // create final load/move work
-
-    boolean preservePartitionSpecs = false;
-
-    Map<String, String> partSpec = ts.getPartSpec();
-    if (partSpec == null) {
-      partSpec = new LinkedHashMap<String, String>();
-      outputs.add(new WriteEntity(ts.tableHandle,
-          (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE :
-              WriteEntity.WriteType.INSERT)));
-    } else {
-      try{
-        Partition part = Hive.get().getPartition(ts.tableHandle, partSpec, false);
-        if (part != null) {
-          if (part.isOffline()) {
-            throw new SemanticException(ErrorMsg.OFFLINE_TABLE_OR_PARTITION.
-                getMsg(ts.tableName + ":" + part.getName()));
-          }
-          if (isOverWrite){
-            outputs.add(new WriteEntity(part, WriteEntity.WriteType.INSERT_OVERWRITE));
-          } else {
-            outputs.add(new WriteEntity(part, WriteEntity.WriteType.INSERT));
-            // If partition already exists and we aren't overwriting it, then respect
-            // its current location info rather than picking it from the parent TableDesc
-            preservePartitionSpecs = true;
-          }
-        } else {
-          outputs.add(new WriteEntity(ts.tableHandle,
-          (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE :
-              WriteEntity.WriteType.INSERT)));
-        }
-      } catch(HiveException e) {
-        throw new SemanticException(e);
-      }
-    }
-
-
-    LoadTableDesc loadTableWork;
-    loadTableWork = new LoadTableDesc(new Path(fromURI),
-      Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
-    if (preservePartitionSpecs){
-      // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but
-      // but preservePartitionSpecs=false(default) here is not sufficient enough
-      // info to set inheritTableSpecs=true
-      loadTableWork.setInheritTableSpecs(false);
-    }
-
-    Task<? extends Serializable> childTask = TaskFactory.get(new MoveWork(getInputs(),
-        getOutputs(), loadTableWork, null, true, isLocal), conf);
-    if (rTask != null) {
-      rTask.addDependentTask(childTask);
-    } else {
-      rTask = childTask;
-    }
-
-    rootTasks.add(rTask);
-
-    // The user asked for stats to be collected.
-    // Some stats like number of rows require a scan of the data
-    // However, some other stats, like number of files, do not require a complete scan
-    // Update the stats which do not require a complete scan.
-    Task<? extends Serializable> statTask = null;
-    if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
-      StatsWork statDesc = new StatsWork(loadTableWork);
-      statDesc.setNoStatsAggregator(true);
-      statDesc.setClearAggregatorStats(true);
-      statDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
-      statTask = TaskFactory.get(statDesc, conf);
-    }
-
-    // HIVE-3334 has been filed for load file with index auto update
-    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
-      IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, getInputs(), conf);
-      try {
-        List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater.generateUpdateTasks();
-
-        for (Task<? extends Serializable> updateTask : indexUpdateTasks) {
-          //LOAD DATA will either have a copy & move or just a move,
-          // we always want the update to be dependent on the move
-          childTask.addDependentTask(updateTask);
-          if (statTask != null) {
-            updateTask.addDependentTask(statTask);
-          }
-        }
-      } catch (HiveException e) {
-        console.printInfo("WARNING: could not auto-update stale indexes, indexes are not out of sync");
-      }
-    }
-    else if (statTask != null) {
-      childTask.addDependentTask(statTask);
-    }
-  }
-
-  private void ensureFileFormatsMatch(TableSpec ts, FileStatus[] fileStatuses) throws SemanticException {
-    final Class<? extends InputFormat> destInputFormat;
-    try {
-      if (ts.getPartSpec() == null || ts.getPartSpec().isEmpty()) {
-        destInputFormat = ts.tableHandle.getInputFormatClass();
-      } else {
-        destInputFormat = ts.partHandle.getInputFormatClass();
-      }
-    } catch (HiveException e) {
-      throw new SemanticException(e);
-    }
-
-    // Other file formats should do similar check to make sure file formats match
-    // when doing LOAD DATA .. INTO TABLE
-    if (OrcInputFormat.class.equals(destInputFormat)) {
-      for (FileStatus fileStatus : fileStatuses) {
-        try {
-          Path filePath = fileStatus.getPath();
-          FileSystem fs = FileSystem.get(filePath.toUri(), conf);
-          // just creating orc reader is going to do sanity checks to make sure its valid ORC file
-          OrcFile.createReader(fs, filePath);
-        } catch (FileFormatException e) {
-          throw new SemanticException(ErrorMsg.INVALID_FILE_FORMAT_IN_LOAD.getMsg("Destination" +
-              " table is stored as ORC but the file being loaded is not a valid ORC file."));
-        } catch (IOException e) {
-          throw new SemanticException("Unable to load data to destination table." +
-              " Error: " + e.getMessage());
-        }
-      }
-    }
-  }
-}