You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/08 20:42:54 UTC

[04/51] [partial] hive git commit: Revert "HIVE-14671 : merge master into hive-14535 (Wei Zheng)"

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index b018adb..0b1ac4b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -37,7 +37,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.hadoop.hive.common.StringInternUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,8 +45,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.Driver.DriverState;
-import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -366,12 +363,8 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
     Map<CombinePathInputFormat, CombineFilter> poolMap =
       new HashMap<CombinePathInputFormat, CombineFilter>();
     Set<Path> poolSet = new HashSet<Path>();
-    LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
 
     for (Path path : paths) {
-      if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
-        throw new IOException("Operation is Canceled. ");
-
       PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
           pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap());
       TableDesc tableDesc = part.getTableDesc();

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
index f73a8e3..cc77e4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
@@ -87,7 +87,7 @@ public final class HiveFileFormatUtils {
   public static class FileChecker {
     // we don't have many file formats that implement InputFormatChecker. We won't be holding
     // multiple instances of such classes
-    private static final int MAX_CACHE_SIZE = 16;
+    private static int MAX_CACHE_SIZE = 16;
 
     // immutable maps
     Map<Class<? extends InputFormat>, Class<? extends InputFormatChecker>> inputFormatCheckerMap;

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 9b83cb4..c697407 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -89,13 +89,14 @@ import org.apache.hive.common.util.ReflectionUtil;
  */
 public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     implements InputFormat<K, V>, JobConfigurable {
+
   private static final String CLASS_NAME = HiveInputFormat.class.getName();
   private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
   /**
    * A cache of InputFormat instances.
    */
-  private static final Map<Class, InputFormat<WritableComparable, Writable>> inputFormats
+  private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats
     = new ConcurrentHashMap<Class, InputFormat<WritableComparable, Writable>>();
 
   private JobConf job;

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
index f41edc4..d391164 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
@@ -839,7 +839,7 @@ public class RCFile {
     // the max size of memory for buffering records before writes them out
     private int columnsBufferSize = 4 * 1024 * 1024; // 4M
     // the conf string for COLUMNS_BUFFER_SIZE
-    public static final String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
+    public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
 
     // how many records already buffered
     private int bufferedRecords = 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index cbd38ed..96ca736 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.orc.FileMetadata;
 import org.apache.orc.PhysicalWriter;
-import org.apache.orc.MemoryManager;
+import org.apache.orc.impl.MemoryManager;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.OrcTail;
 
@@ -258,7 +258,7 @@ public final class OrcFile extends org.apache.orc.OrcFile {
     /**
      * A package local option to set the memory manager.
      */
-    public WriterOptions memory(MemoryManager value) {
+    protected WriterOptions memory(MemoryManager value) {
       super.memory(value);
       return this;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 8fb7211..59682db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -158,7 +158,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(OrcInputFormat.class);
-  private static final boolean isDebugEnabled = LOG.isDebugEnabled();
+  private static boolean isDebugEnabled = LOG.isDebugEnabled();
   static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
 
   private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
@@ -1531,7 +1531,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       Reader.Options readerOptions = new Reader.Options(context.conf);
       if (readerTypes == null) {
         readerIncluded = genIncludedColumns(fileSchema, context.conf);
-        evolution = new SchemaEvolution(fileSchema, null, readerOptions.include(readerIncluded));
+        evolution = new SchemaEvolution(fileSchema, readerOptions.include(readerIncluded));
       } else {
         // The reader schema always comes in without ACID columns.
         TypeDescription readerSchema = OrcUtils.convertTypeFromProtobuf(readerTypes, 0);
@@ -1913,6 +1913,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
+  // The schema type description does not include the ACID fields (i.e. it is the
+  // non-ACID original schema).
+  private static boolean SCHEMA_TYPES_IS_ORIGINAL = true;
+
   @Override
   public RowReader<OrcStruct> getReader(InputSplit inputSplit,
                                         Options options)

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 5b2e9b5..0ac3ec5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -18,12 +18,9 @@
 package org.apache.hadoop.hive.ql.io.orc.encoded;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.IdentityHashMap;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -46,13 +43,11 @@ import org.apache.orc.impl.RecordReaderUtils;
 import org.apache.orc.impl.StreamName;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.impl.BufferChunk;
+import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
 import org.apache.orc.OrcProto;
 
-import sun.misc.Cleaner;
-
-
 /**
  * Encoded reader implementation.
  *
@@ -85,17 +80,6 @@ import sun.misc.Cleaner;
  */
 class EncodedReaderImpl implements EncodedReader {
   public static final Logger LOG = LoggerFactory.getLogger(EncodedReaderImpl.class);
-  private static Field cleanerField;
-  static {
-    try {
-      // TODO: To make it work for JDK9 use CleanerUtil from https://issues.apache.org/jira/browse/HADOOP-12760
-      final Class<?> dbClazz = Class.forName("java.nio.DirectByteBuffer");
-      cleanerField = dbClazz.getDeclaredField("cleaner");
-      cleanerField.setAccessible(true);
-    } catch (Throwable t) {
-      cleanerField = null;
-    }
-  }
   private static final Object POOLS_CREATION_LOCK = new Object();
   private static Pools POOLS;
   private static class Pools {
@@ -220,8 +204,8 @@ class EncodedReaderImpl implements EncodedReader {
 
   @Override
   public void readEncodedColumns(int stripeIx, StripeInformation stripe,
-      OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings,
-      List<OrcProto.Stream> streamList, boolean[] included, boolean[][] colRgs,
+      OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings, List<OrcProto.Stream> streamList,
+      boolean[] included, boolean[][] colRgs,
       Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
     // Note: for now we don't have to setError here, caller will setError if we throw.
     // We are also not supposed to call setDone, since we are only part of the operation.
@@ -319,35 +303,15 @@ class EncodedReaderImpl implements EncodedReader {
       }
     }
 
-    // TODO: the memory release could be optimized - we could release original buffers after we
-    //       are fully done with each original buffer from disk. For now release all at the end;
-    //       it doesn't increase the total amount of memory we hold, just the duration a bit.
-    //       This is much simpler - we can just remember original ranges after reading them, and
-    //       release them at the end. In a few cases where it's easy to determine that a buffer
-    //       can be freed in advance, we remove it from the map.
-    IdentityHashMap<ByteBuffer, Boolean> toRelease = null;
     if (!isAllInCache.value) {
       if (!isDataReaderOpen) {
         this.dataReader.open();
         isDataReaderOpen = true;
       }
       dataReader.readFileData(toRead.next, stripeOffset, cacheWrapper.getAllocator().isDirectAlloc());
-      toRelease = new IdentityHashMap<>();
-      DiskRangeList drl = toRead.next;
-      while (drl != null) {
-        if (drl instanceof BufferChunk) {
-          toRelease.put(drl.getData(), true);
-        }
-        drl = drl.next;
-      }
     }
 
     // 3. For uncompressed case, we need some special processing before read.
-    //    Basically, we are trying to create artificial, consistent ranges to cache, as there are
-    //    no CBs in an uncompressed file. At the end of this processing, the list would contain
-    //    either cache buffers, or buffers allocated by us and not cached (if we are only reading
-    //    parts of the data for some ranges and don't want to cache it). Both are represented by
-    //    CacheChunks, so the list is just CacheChunk-s from that point on.
     DiskRangeList iter = toRead.next;  // Keep "toRead" list for future use, don't extract().
     if (codec == null) {
       for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
@@ -362,12 +326,6 @@ class EncodedReaderImpl implements EncodedReader {
           }
         }
       }
-      // Release buffers as we are done with all the streams... also see toRelease comment.\
-      // With uncompressed streams, we know we are done earlier.
-      if (toRelease != null) {
-        releaseBuffers(toRelease.keySet(), true);
-        toRelease = null;
-      }
       if (isTracingEnabled) {
         LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base offset "
             + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
@@ -416,8 +374,8 @@ class EncodedReaderImpl implements EncodedReader {
               if (sctx.stripeLevelStream == null) {
                 sctx.stripeLevelStream = POOLS.csdPool.take();
                 // We will be using this for each RG while also sending RGs to processing.
-                // To avoid buffers being unlocked, run refcount one ahead; so each RG 
-                 // processing will decref once, and the
+                // To avoid buffers being unlocked, run refcount one ahead; we will not increase
+                // it when building the last RG, so each RG processing will decref once, and the
                 // last one will unlock the buffers.
                 sctx.stripeLevelStream.incRef();
                 // For stripe-level streams we don't need the extra refcount on the block.
@@ -425,12 +383,14 @@ class EncodedReaderImpl implements EncodedReader {
                 long unlockUntilCOffset = sctx.offset + sctx.length;
                 DiskRangeList lastCached = readEncodedStream(stripeOffset, iter,
                     sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream,
-                    unlockUntilCOffset, sctx.offset, toRelease);
+                    unlockUntilCOffset, sctx.offset);
                 if (lastCached != null) {
                   iter = lastCached;
                 }
               }
-              sctx.stripeLevelStream.incRef();
+              if (!isLastRg) {
+                sctx.stripeLevelStream.incRef();
+              }
               cb = sctx.stripeLevelStream;
             } else {
               // This stream can be separated by RG using index. Let's do that.
@@ -451,7 +411,7 @@ class EncodedReaderImpl implements EncodedReader {
               boolean isStartOfStream = sctx.bufferIter == null;
               DiskRangeList lastCached = readEncodedStream(stripeOffset,
                   (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb,
-                  unlockUntilCOffset, sctx.offset, toRelease);
+                  unlockUntilCOffset, sctx.offset);
               if (lastCached != null) {
                 sctx.bufferIter = iter = lastCached;
               }
@@ -477,27 +437,7 @@ class EncodedReaderImpl implements EncodedReader {
     }
 
     // Release the unreleased buffers. See class comment about refcounts.
-    for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
-      ColumnReadContext ctx = colCtxs[colIx];
-      if (ctx == null) continue; // This column is not included.
-      for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
-        StreamContext sctx = ctx.streams[streamIx];
-        if (sctx == null || sctx.stripeLevelStream == null) continue;
-        if (0 != sctx.stripeLevelStream.decRef()) continue;
-        for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Unlocking {} at the end of processing", buf);
-          }
-          cacheWrapper.releaseBuffer(buf);
-        }
-      }
-    }
-
     releaseInitialRefcounts(toRead.next);
-    // Release buffers as we are done with all the streams... also see toRelease comment.
-    if (toRelease != null) {
-      releaseBuffers(toRelease.keySet(), true);
-    }
     releaseCacheChunksIntoObjectPool(toRead.next);
   }
 
@@ -665,8 +605,8 @@ class EncodedReaderImpl implements EncodedReader {
    *         the master list, so they are safe to keep as iterators for various streams.
    */
   public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, long cOffset,
-      long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset,
-      IdentityHashMap<ByteBuffer, Boolean> toRelease) throws IOException {
+      long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset)
+          throws IOException {
     if (csd.getCacheBuffers() == null) {
       csd.setCacheBuffers(new ArrayList<MemoryBuffer>());
     } else {
@@ -675,10 +615,10 @@ class EncodedReaderImpl implements EncodedReader {
     if (cOffset == endCOffset) return null;
     boolean isCompressed = codec != null;
     List<ProcCacheChunk> toDecompress = null;
+    List<ByteBuffer> toRelease = null;
     List<IncompleteCb> badEstimates = null;
-    List<ByteBuffer> toReleaseCopies = null;
     if (isCompressed) {
-      toReleaseCopies = new ArrayList<>();
+      toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>();
       toDecompress = new ArrayList<>();
       badEstimates = new ArrayList<>();
     }
@@ -696,8 +636,8 @@ class EncodedReaderImpl implements EncodedReader {
     // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
     try {
       lastUncompressed = isCompressed ?
-          prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset, unlockUntilCOffset,
-              current, csd, toRelease, toReleaseCopies, toDecompress, badEstimates)
+          prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
+              unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates)
         : prepareRangesForUncompressedRead(
             cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd);
     } catch (Exception ex) {
@@ -717,10 +657,7 @@ class EncodedReaderImpl implements EncodedReader {
       assert result == null; // We don't expect conflicts from bad estimates.
     }
 
-    if (toDecompress == null || toDecompress.isEmpty()) {
-      releaseBuffers(toReleaseCopies, false);
-      return lastUncompressed; // Nothing to do.
-    }
+    if (toDecompress == null || toDecompress.isEmpty()) return lastUncompressed; // Nothing to do.
 
     // 3. Allocate the buffers, prepare cache keys.
     // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
@@ -753,18 +690,21 @@ class EncodedReaderImpl implements EncodedReader {
       cacheWrapper.reuseBuffer(chunk.getBuffer());
     }
 
-    // 5. Release the copies we made directly to the cleaner.
-    releaseBuffers(toReleaseCopies, false);
+    // 5. Release original compressed buffers to zero-copy reader if needed.
+    if (toRelease != null) {
+      assert dataReader.isTrackingDiskRanges();
+      for (ByteBuffer buffer : toRelease) {
+        dataReader.releaseBuffer(buffer);
+      }
+    }
 
     // 6. Finally, put uncompressed data to cache.
     if (fileKey != null) {
-      long[] collisionMask = cacheWrapper.putFileData(
-          fileKey, cacheKeys, targetBuffers, baseOffset);
+      long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
       processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers());
     }
 
-    // 7. It may happen that we know we won't use some cache buffers anymore (the alternative
-    //    is that we will use the same buffers for other streams in separate calls).
+    // 7. It may happen that we know we won't use some compression buffers anymore.
     //    Release initial refcounts.
     for (ProcCacheChunk chunk : toDecompress) {
       ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, chunk);
@@ -773,11 +713,9 @@ class EncodedReaderImpl implements EncodedReader {
     return lastUncompressed;
   }
 
-  /** Subset of readEncodedStream specific to compressed streams, separate to avoid long methods. */
   private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
-      long streamOffset, long unlockUntilCOffset, DiskRangeList current,
-      ColumnStreamData columnStreamData, IdentityHashMap<ByteBuffer, Boolean> toRelease,
-      List<ByteBuffer> toReleaseCopies, List<ProcCacheChunk> toDecompress,
+      long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData,
+      List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress,
       List<IncompleteCb> badEstimates) throws IOException {
     if (cOffset > current.getOffset()) {
       // Target compression block is in the middle of the range; slice the range in two.
@@ -824,8 +762,8 @@ class EncodedReaderImpl implements EncodedReader {
           throw new RuntimeException(msg);
         }
         BufferChunk bc = (BufferChunk)current;
-        ProcCacheChunk newCached = addOneCompressionBuffer(bc, columnStreamData.getCacheBuffers(),
-            toDecompress, toRelease, toReleaseCopies, badEstimates);
+        ProcCacheChunk newCached = addOneCompressionBuffer(
+            bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, badEstimates);
         lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
         next = (newCached != null) ? newCached.next : null;
         currentOffset = (next != null) ? next.getOffset() : -1;
@@ -839,12 +777,9 @@ class EncodedReaderImpl implements EncodedReader {
     return lastUncompressed;
   }
 
-  /** Subset of readEncodedStream specific to uncompressed streams, separate to avoid long methods. */
   private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset,
-      long streamOffset, long unlockUntilCOffset, DiskRangeList current,
-      ColumnStreamData columnStreamData) throws IOException {
-    // Note: we are called after preReadUncompressedStream, so it doesn't have to do nearly as much
-    //       as prepareRangesForCompressedRead does; e.g. every buffer is already a CacheChunk.
+      long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData)
+          throws IOException {
     long currentOffset = cOffset;
     CacheChunk lastUncompressed = null;
     boolean isFirst = true;
@@ -884,10 +819,11 @@ class EncodedReaderImpl implements EncodedReader {
    * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
    * allocator. Uncompressed case is not mainline though so let's not complicate it.
    */
-  private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start,
-      long streamOffset, long streamEnd) throws IOException {
+  private DiskRangeList preReadUncompressedStream(long baseOffset,
+      DiskRangeList start, long streamOffset, long streamEnd) throws IOException {
     if (streamOffset == streamEnd) return null;
     List<UncompressedCacheChunk> toCache = null;
+    List<ByteBuffer> toRelease = null;
 
     // 1. Find our bearings in the stream.
     DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd);
@@ -924,6 +860,9 @@ class EncodedReaderImpl implements EncodedReader {
       if (current.getOffset() >= partEnd) {
         continue; // We have no data at all for this part of the stream (could be unneeded), skip.
       }
+      if (toRelease == null && dataReader.isTrackingDiskRanges()) {
+        toRelease = new ArrayList<ByteBuffer>();
+      }
       // We have some disk buffers... see if we have entire part, etc.
       UncompressedCacheChunk candidateCached = null; // We will cache if we have the entire part.
       DiskRangeList next = current;
@@ -938,15 +877,21 @@ class EncodedReaderImpl implements EncodedReader {
         current = next;
         if (noMoreDataForPart) break; // Done with this part.
 
+        boolean wasSplit = false;
         if (current.getEnd() > partEnd) {
           // If the current buffer contains multiple parts, split it.
           current = current.split(partEnd);
+          wasSplit = true;
         }
         if (isTracingEnabled) {
           LOG.trace("Processing uncompressed file data at ["
               + current.getOffset() + ", " + current.getEnd() + ")");
         }
         BufferChunk curBc = (BufferChunk)current;
+        if (!wasSplit && toRelease != null) {
+          toRelease.add(curBc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part?
+        }
+
         // Track if we still have the entire part.
         long hadEntirePartTo = hasEntirePartTo;
         // We have data until the end of current block if we had it until the beginning.
@@ -1007,7 +952,15 @@ class EncodedReaderImpl implements EncodedReader {
       ++ix;
     }
 
-    // 5. Put uncompressed data to cache.
+    // 5. Release original compressed buffers to zero-copy reader if needed.
+    if (toRelease != null) {
+      assert dataReader.isTrackingDiskRanges();
+      for (ByteBuffer buf : toRelease) {
+        dataReader.releaseBuffer(buf);
+      }
+    }
+
+    // 6. Finally, put uncompressed data to cache.
     if (fileKey != null) {
       long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
       processCacheCollisions(collisionMask, toCache, targetBuffers, null);
@@ -1016,6 +969,7 @@ class EncodedReaderImpl implements EncodedReader {
     return lastUncompressed;
   }
 
+
   private int determineUncompressedPartSize() {
     // We will break the uncompressed data in the cache in the chunks that are the size
     // of the prevalent ORC compression buffer (the default), or maximum allocation (since we
@@ -1224,8 +1178,7 @@ class EncodedReaderImpl implements EncodedReader {
    */
   private ProcCacheChunk addOneCompressionBuffer(BufferChunk current,
       List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress,
-      IdentityHashMap<ByteBuffer, Boolean> toRelease, List<ByteBuffer> toReleaseCopies,
-      List<IncompleteCb> badEstimates) throws IOException {
+      List<ByteBuffer> toRelease, List<IncompleteCb> badEstimates) throws IOException {
     ByteBuffer slice = null;
     ByteBuffer compressed = current.getChunk();
     long cbStartOffset = current.getOffset();
@@ -1248,8 +1201,12 @@ class EncodedReaderImpl implements EncodedReader {
       // Simple case - CB fits entirely in the disk range.
       slice = compressed.slice();
       slice.limit(chunkLength);
-      return addOneCompressionBlockByteBuffer(slice, isUncompressed,
+      ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed,
           cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers);
+      if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
+        toRelease.add(compressed);
+      }
+      return cc;
     }
     if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
       badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0));
@@ -1259,7 +1216,6 @@ class EncodedReaderImpl implements EncodedReader {
     // TODO: we could remove extra copy for isUncompressed case by copying directly to cache.
     // We need to consolidate 2 or more buffers into one to decompress.
     ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
-    toReleaseCopies.add(copy); // We will always release copies at the end.
     int remaining = chunkLength - compressed.remaining();
     int originalPos = compressed.position();
     copy.put(compressed);
@@ -1268,8 +1224,12 @@ class EncodedReaderImpl implements EncodedReader {
     }
     DiskRangeList next = current.next;
     current.removeSelf();
-    if (originalPos == 0 && toRelease.remove(compressed)) {
-      releaseBuffer(compressed, true);
+    if (dataReader.isTrackingDiskRanges()) {
+      if (originalPos == 0) {
+        dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+      } else {
+        toRelease.add(compressed); // There might be slices depending on this buffer.
+      }
     }
 
     int extraChunkCount = 0;
@@ -1286,15 +1246,15 @@ class EncodedReaderImpl implements EncodedReader {
         copy.put(slice);
         ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed,
             cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers);
-        if (compressed.remaining() <= 0 && toRelease.remove(compressed)) {
-          releaseBuffer(compressed, true); // We copied the entire buffer. 
-        } // else there's more data to process; will be handled in next call.
+        if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
+          dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+        }
         return cc;
       }
       remaining -= compressed.remaining();
-      copy.put(compressed); // TODO: move into the if below; account for release call
-      if (toRelease.remove(compressed)) {
-        releaseBuffer(compressed, true); // We copied the entire buffer.
+      copy.put(compressed);
+      if (dataReader.isTrackingDiskRanges()) {
+        dataReader.releaseBuffer(compressed); // We copied the entire buffer.
       }
       DiskRangeList tmp = next;
       next = next.hasContiguousNext() ? next.next : null;
@@ -1310,38 +1270,6 @@ class EncodedReaderImpl implements EncodedReader {
     }
   }
 
-  private void releaseBuffers(Collection<ByteBuffer> toRelease, boolean isFromDataReader) {
-    if (toRelease == null) return;
-    for (ByteBuffer buf : toRelease) {
-      releaseBuffer(buf, isFromDataReader);
-    }
-  }
-
-  private void releaseBuffer(ByteBuffer bb, boolean isFromDataReader) {
-    if (isTracingEnabled) {
-      LOG.trace("Releasing the buffer " + System.identityHashCode(bb));
-    }
-    if (isFromDataReader && dataReader.isTrackingDiskRanges()) {
-      dataReader.releaseBuffer(bb);
-      return;
-    }
-    Field localCf = cleanerField;
-    if (!bb.isDirect() || localCf == null) return;
-    try {
-      Cleaner cleaner = (Cleaner) localCf.get(bb);
-      if (cleaner != null) {
-        cleaner.clean();
-      } else {
-        LOG.debug("Unable to clean a buffer using cleaner - no cleaner");
-      }
-    } catch (Exception e) {
-      // leave it for GC to clean up
-      LOG.warn("Unable to clean direct buffers using Cleaner.");
-      cleanerField = null;
-    }
-  }
-
-
   private IncompleteCb addIncompleteCompressionBuffer(
       long cbStartOffset, DiskRangeList target, int extraChunkCount) {
     IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
index a7bb5ee..26f1e75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
@@ -21,7 +21,6 @@ import java.util.Properties;
 import java.util.TimeZone;
 
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
 import org.apache.parquet.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -140,11 +139,14 @@ public class MapredParquetOutputFormat extends FileOutputFormat<NullWritable, Pa
     String timeZoneID =
         tableProperties.getProperty(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY);
     if (!Strings.isNullOrEmpty(timeZoneID)) {
-
-      NanoTimeUtils.validateTimeZone(timeZoneID);
+      if (!Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZoneID)) {
+        throw new IllegalStateException("Unexpected timezone id found for parquet int96 conversion: " + timeZoneID);
+      }
       return TimeZone.getTimeZone(timeZoneID);
     }
 
-    return TimeZone.getDefault();
+    // If no timezone is defined in table properties, then adjust timestamps using
+    // PARQUET_INT96_NO_ADJUSTMENT_ZONE timezone
+    return TimeZone.getTimeZone(ParquetTableUtils.PARQUET_INT96_NO_ADJUSTMENT_ZONE);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
index 2954601..8e33b7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
@@ -20,7 +20,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
 import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.SerDeStats;
@@ -45,6 +44,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.TimeZone;
 
@@ -170,7 +170,7 @@ public class ParquetRecordReaderBase {
     boolean skipConversion = HiveConf.getBoolVar(configuration,
         HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION);
     FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
-    if (!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr") &&
+    if (!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr") ||
         skipConversion) {
       // Impala writes timestamp values using GMT only. We should not try to convert Impala
       // files to other type of timezones.
@@ -179,12 +179,16 @@ public class ParquetRecordReaderBase {
       // TABLE_PARQUET_INT96_TIMEZONE is a table property used to detect what timezone conversion
       // to use when reading Parquet timestamps.
       timeZoneID = configuration.get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,
-          TimeZone.getDefault().getID());
-      NanoTimeUtils.validateTimeZone(timeZoneID);
+          ParquetTableUtils.PARQUET_INT96_NO_ADJUSTMENT_ZONE);
+
+      if (!Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZoneID)) {
+          throw new IllegalStateException("Unexpected timezone id found for parquet int96 conversion: " + timeZoneID);
+      }
     }
 
     // 'timeZoneID' should be valid, since we did not throw exception above
-    configuration.set(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,timeZoneID);
+    configuration.set(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,
+        TimeZone.getTimeZone(timeZoneID).getID());
   }
 
   public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
index dbd6fb3..5dc8088 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
@@ -152,26 +152,13 @@ public class NanoTimeUtils {
 
     calendar.setTimeInMillis(utcCalendar.getTimeInMillis());
 
-    Calendar adjusterCalendar = copyToCalendarWithTZ(calendar, getLocalCalendar());
+    Calendar adjusterCalendar = copyToCalendarWithTZ(calendar, Calendar.getInstance());
 
     Timestamp ts = new Timestamp(adjusterCalendar.getTimeInMillis());
     ts.setNanos((int) nanos);
     return ts;
   }
 
-  /**
-   * Check if the string id is a valid java TimeZone id.
-   * TimeZone#getTimeZone will return "GMT" if the id cannot be understood.
-   * @param timeZoneID
-   */
-  public static void validateTimeZone(String timeZoneID) {
-    if (TimeZone.getTimeZone(timeZoneID).getID().equals("GMT")
-        && !"GMT".equals(timeZoneID)) {
-      throw new IllegalStateException(
-          "Unexpected timezone id found for parquet int96 conversion: " + timeZoneID);
-    }
-  }
-
   private static Calendar copyToCalendarWithTZ(Calendar from, Calendar to) {
     if(from.getTimeZone().getID().equals(to.getTimeZone().getID())) {
       return from;

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 312cdac..6ca1963 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -121,9 +121,8 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     try {
       serDeStats = new SerDeStats();
       projectionPusher = new ProjectionPusher();
-      ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf);
-      if (inputSplit != null) {
-        initialize(inputSplit, conf);
+      if (oldInputSplit != null) {
+        initialize(getSplit(oldInputSplit, conf), conf);
         setTimeZoneConversion(jobConf, ((FileSplit) oldInputSplit).getPath());
       }
       colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
index 5401c7b..00c9645 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.StatsSetupConst.StatDB;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.Context;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
@@ -75,6 +77,8 @@ import org.apache.logging.log4j.core.appender.RollingFileAppender;
 @SuppressWarnings( { "deprecation"})
 public class PartialScanTask extends Task<PartialScanWork> implements
     Serializable, HadoopJobExecHook {
+
+
   private static final long serialVersionUID = 1L;
 
   protected transient JobConf job;
@@ -270,7 +274,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
     return "RCFile Statistics Partial Scan";
   }
 
-  public static final String INPUT_SEPERATOR = ":";
+  public static String INPUT_SEPERATOR = ":";
 
   public static void main(String[] args) {
     String inputPathStr = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 01e8a48..d255265 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -21,11 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.plan.HiveOperation;
-import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
-import org.apache.hadoop.hive.ql.plan.LockTableDesc;
-import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
-import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,13 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger;
  * with a single thread accessing it at a time, with the exception of {@link #heartbeat()} method.
  * The later may (usually will) be called from a timer thread.
  * See {@link #getMS()} for more important concurrency/metastore access notes.
- * 
- * Each statement that the TM (transaction manager) should be aware of should belong to a transaction.
- * Effectively, that means any statement that has side effects.  Exceptions are statements like
- * Show Compactions, Show Tables, Use Database foo, etc.  The transaction is started either
- * explicitly ( via Start Transaction SQL statement from end user - not fully supported) or
- * implicitly by the {@link org.apache.hadoop.hive.ql.Driver} (which looks exactly as autoCommit=true
- * from end user poit of view). See more at {@link #isExplicitTransaction}.
  */
 public final class DbTxnManager extends HiveTxnManagerImpl {
 
@@ -88,47 +76,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
    * to keep apart multiple writes of the same data within the same transaction
    * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
    */
-  private int writeId = -1;
-  /**
-   * counts number of statements in the current transaction
-   */
-  private int numStatements = 0;
-  /**
-   * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot
-   * include any Operations which cannot be rolled back (drop partition; write to  non-acid table).
-   * If false, it's a single statement transaction which can include any statement.  This is not a 
-   * contradiction from the user point of view who doesn't know anything about the implicit txn
-   * and cannot call rollback (the statement of course can fail in which case there is nothing to 
-   * rollback (assuming the statement is well implemented)).
-   *
-   * This is done so that all commands run in a transaction which simplifies implementation and
-   * allows a simple implementation of multi-statement txns which don't require a lock manager
-   * capable of deadlock detection.  (todo: not fully implemented; elaborate on how this LM works)
-   *
-   * Also, critically important, ensuring that everything runs in a transaction assigns an order
-   * to all operations in the system - needed for replication/DR.
-   *
-   * We don't want to allow non-transactional statements in a user demarcated txn because the effect
-   * of such statement is "visible" immediately on statement completion, but the user may
-   * issue a rollback but the action of the statement can't be undone (and has possibly already been
-   * seen by another txn).  For example,
-   * start transaction
-   * insert into transactional_table values(1);
-   * insert into non_transactional_table select * from transactional_table;
-   * rollback
-   *
-   * The user would be in for a surprise especially if they are not aware of transactional
-   * properties of the tables involved.
-   *
-   * As a side note: what should the lock manager do with locks for non-transactional resources?
-   * Should it it release them at the end of the stmt or txn?
-   * Some interesting thoughts: http://mysqlmusings.blogspot.com/2009/02/mixing-engines-in-transactions.html
-   */
-  private boolean isExplicitTransaction = false;
-  /**
-   * To ensure transactions don't nest.
-   */
-  private int startTransactionCount = 0;
+  private int statementId = -1;
 
   // QueryId for the query in current transaction
   private String queryId;
@@ -193,22 +141,15 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
 
   @VisibleForTesting
   long openTxn(Context ctx, String user, long delay) throws LockException {
-    /*Q: why don't we lock the snapshot here???  Instead of having client make an explicit call
-    whenever it chooses
-    A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock
-    acquisition.  Relying on locks is a pessimistic strategy which works better under high
-    contention.*/
+    //todo: why don't we lock the snapshot here???  Instead of having client make an explicit call
+    //whenever it chooses
     init();
-    getLockManager();
     if(isTxnOpen()) {
       throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
     }
     try {
       txnId = getMS().openTxn(user);
-      writeId = 0;
-      numStatements = 0;
-      isExplicitTransaction = false;
-      startTransactionCount = 0;
+      statementId = 0;
       LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
       ctx.setHeartbeater(startHeartbeat(delay));
       return txnId;
@@ -218,8 +159,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   }
 
   /**
-   * we don't expect multiple threads to call this method concurrently but {@link #lockMgr} will
-   * be read by a different threads than one writing it, thus it's {@code volatile}
+   * we don't expect multiple thread to call this method concurrently but {@link #lockMgr} will
+   * be read by a different threads that one writing it, thus it's {@code volatile}
    */
   @Override
   public HiveLockManager getLockManager() throws LockException {
@@ -238,95 +179,24 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     catch(LockException e) {
       if(e.getCause() instanceof TxnAbortedException) {
         txnId = 0;
-        writeId = -1;
+        statementId = -1;
       }
       throw e;
     }
   }
 
   /**
-   * Watermark to include in error msgs and logs
-   * @param queryPlan
-   * @return
-   */
-  private static String getQueryIdWaterMark(QueryPlan queryPlan) {
-    return "queryId=" + queryPlan.getQueryId();
-  }
-
-  private void markExplicitTransaction(QueryPlan queryPlan) throws LockException {
-    isExplicitTransaction = true;
-    if(++startTransactionCount > 1) {
-      throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
-        JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
-    }
-
-  }
-  /**
-   * Ensures that the current SQL statement is appropriate for the current state of the
-   * Transaction Manager (e.g. can call commit unless you called start transaction)
-   * 
-   * Note that support for multi-statement txns is a work-in-progress so it's only supported in
-   * HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST.
-   * @param queryPlan
-   * @throws LockException
-   */
-  private void verifyState(QueryPlan queryPlan) throws LockException {
-    if(!isTxnOpen()) {
-      throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() + 
-        " for " + getQueryIdWaterMark(queryPlan));
-    }
-    if(queryPlan.getOperation() == null) {
-      throw new IllegalStateException("Unkown HiverOperation for " + getQueryIdWaterMark(queryPlan));
-    }
-    numStatements++;
-    switch (queryPlan.getOperation()) {
-      case START_TRANSACTION:
-        markExplicitTransaction(queryPlan);
-        break;
-      case COMMIT:
-      case ROLLBACK:
-        if(!isTxnOpen()) {
-          throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryPlan.getOperationName());
-        }
-        if(!isExplicitTransaction) {
-          throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_IMPLICIT_TXN, queryPlan.getOperationName());
-        }
-        break;
-      default:
-        if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) {
-          //for example, drop table in an explicit txn is not allowed
-          //in some cases this requires looking at more than just the operation
-          //for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table
-          throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
-            JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
-        }
-    }
-    /*
-    Should we allow writing to non-transactional tables in an explicit transaction?  The user may
-    issue ROLLBACK but these tables won't rollback.
-    Can do this by checking ReadEntity/WriteEntity to determine whether it's reading/writing
-    any non acid and raise an appropriate error
-    * Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/
-  }
-  /**
-   * Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
+   * This is for testing only.  Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
    * @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING
    * @return null if no locks were needed
    */
-  @VisibleForTesting
   LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException {
     init();
-    // Make sure we've built the lock manager
+        // Make sure we've built the lock manager
     getLockManager();
-    verifyState(plan);
+
     boolean atLeastOneLock = false;
     queryId = plan.getQueryId();
-    switch (plan.getOperation()) {
-      case SET_AUTOCOMMIT:
-        /**This is here for documentation purposes.  This TM doesn't support this - only has one
-        * mode of operation documented at {@link DbTxnManager#isExplicitTransaction}*/
-        return  null;
-    }
 
     LockRequestBuilder rqstBuilder = new LockRequestBuilder(queryId);
     //link queryId to txnId
@@ -370,8 +240,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           // This is a file or something we don't hold locks for.
           continue;
       }
-      if(t != null) {
-        compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t));
+      if(t != null && AcidUtils.isFullAcidTable(t)) {
+        compBuilder.setIsAcid(true);
       }
       LockComponent comp = compBuilder.build();
       LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -392,33 +262,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       }
       LockComponentBuilder compBuilder = new LockComponentBuilder();
       Table t = null;
-      switch (output.getType()) {
-        case DATABASE:
-          compBuilder.setDbName(output.getDatabase().getName());
-          break;
-
-        case TABLE:
-        case DUMMYPARTITION:   // in case of dynamic partitioning lock the table
-          t = output.getTable();
-          compBuilder.setDbName(t.getDbName());
-          compBuilder.setTableName(t.getTableName());
-          break;
-
-        case PARTITION:
-          compBuilder.setPartitionName(output.getPartition().getName());
-          t = output.getPartition().getTable();
-          compBuilder.setDbName(t.getDbName());
-          compBuilder.setTableName(t.getTableName());
-          break;
-
-        default:
-          // This is a file or something we don't hold locks for.
-          continue;
-      }
       switch (output.getWriteType()) {
-        /* base this on HiveOperation instead?  this and DDL_NO_LOCK is peppered all over the code...
-         Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think
-         makes sense everywhere).  This however would be problematic for merge...*/
         case DDL_EXCLUSIVE:
         case INSERT_OVERWRITE:
           compBuilder.setExclusive();
@@ -426,9 +270,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           break;
 
         case INSERT:
-          assert t != null;
+          t = getTable(output);
           if(AcidUtils.isFullAcidTable(t)) {
             compBuilder.setShared();
+            compBuilder.setIsAcid(true);
           }
           else {
             if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) {
@@ -436,6 +281,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
             } else {  // this is backward compatible for non-ACID resources, w/o ACID semantics
               compBuilder.setShared();
             }
+            compBuilder.setIsAcid(false);
           }
           compBuilder.setOperationType(DataOperationType.INSERT);
           break;
@@ -447,10 +293,12 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
         case UPDATE:
           compBuilder.setSemiShared();
           compBuilder.setOperationType(DataOperationType.UPDATE);
+          t = getTable(output);
           break;
         case DELETE:
           compBuilder.setSemiShared();
           compBuilder.setOperationType(DataOperationType.DELETE);
+          t = getTable(output);
           break;
 
         case DDL_NO_LOCK:
@@ -459,11 +307,34 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
         default:
           throw new RuntimeException("Unknown write type " +
               output.getWriteType().toString());
+
       }
-      if(t != null) {
-        compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t));
-      }
+      switch (output.getType()) {
+        case DATABASE:
+          compBuilder.setDbName(output.getDatabase().getName());
+          break;
 
+        case TABLE:
+        case DUMMYPARTITION:   // in case of dynamic partitioning lock the table
+          t = output.getTable();
+          compBuilder.setDbName(t.getDbName());
+          compBuilder.setTableName(t.getTableName());
+          break;
+
+        case PARTITION:
+          compBuilder.setPartitionName(output.getPartition().getName());
+          t = output.getPartition().getTable();
+          compBuilder.setDbName(t.getDbName());
+          compBuilder.setTableName(t.getTableName());
+          break;
+
+        default:
+          // This is a file or something we don't hold locks for.
+          continue;
+      }
+      if(t != null && AcidUtils.isFullAcidTable(t)) {
+        compBuilder.setIsAcid(true);
+      }
       compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
       LockComponent comp = compBuilder.build();
       LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -534,8 +405,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           e);
     } finally {
       txnId = 0;
-      writeId = -1;
-      numStatements = 0;
+      statementId = -1;
     }
   }
 
@@ -559,8 +429,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           e);
     } finally {
       txnId = 0;
-      writeId = -1;
-      numStatements = 0;
+      statementId = -1;
     }
   }
 
@@ -687,26 +556,6 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   public boolean supportsExplicitLock() {
     return false;
   }
-  @Override
-  public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
-    super.lockTable(db, lockTbl);
-    throw new UnsupportedOperationException();
-  }
-  @Override
-  public int unlockTable(Hive hiveDB, UnlockTableDesc unlockTbl) throws HiveException {
-    super.unlockTable(hiveDB, unlockTbl);
-    throw new UnsupportedOperationException();
-  }
-  @Override
-  public int lockDatabase(Hive hiveDB, LockDatabaseDesc lockDb) throws HiveException {
-    super.lockDatabase(hiveDB, lockDb);
-    throw new UnsupportedOperationException();
-  }
-  @Override
-  public int unlockDatabase(Hive hiveDB, UnlockDatabaseDesc unlockDb) throws HiveException {
-    super.unlockDatabase(hiveDB, unlockDb);
-    throw new UnsupportedOperationException();
-  }
 
   @Override
   public boolean useNewShowLocksFormat() {
@@ -717,44 +566,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   public boolean supportsAcid() {
     return true;
   }
-  /**
-   * In an explicit txn start_transaction is the 1st statement and we record the snapshot at the
-   * start of the txn for Snapshot Isolation.  For Read Committed (not supported yet) we'd record
-   * it before executing each statement (but after lock acquisition if using lock based concurrency
-   * control).
-   * For implicit txn, the stmt that triggered/started the txn is the first statement
-   */
-  @Override
-  public boolean recordSnapshot(QueryPlan queryPlan) {
-    assert isTxnOpen();
-    assert numStatements > 0 : "was acquireLocks() called already?";
-    if(queryPlan.getOperation() == HiveOperation.START_TRANSACTION) {
-      //here if start of explicit txn
-      assert isExplicitTransaction;
-      assert numStatements == 1;
-      return true;
-    }
-    else if(!isExplicitTransaction) {
-      assert numStatements == 1 : "numStatements=" + numStatements + " in implicit txn";
-      if (queryPlan.hasAcidResourcesInQuery()) {
-        //1st and only stmt in implicit txn and uses acid resource
-        return true;
-      }
-    }
-    return false;
-  }
-  @Override
-  public boolean isImplicitTransactionOpen() {
-    if(!isTxnOpen()) {
-      //some commands like "show databases" don't start implicit transactions
-      return false;
-    }
-    if(!isExplicitTransaction) {
-      assert numStatements == 1 : "numStatements=" + numStatements;
-      return true;
-    }
-    return false;
-  }
+
   @Override
   protected void destruct() {
     try {
@@ -814,7 +626,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   @Override
   public int getWriteIdAndIncrement() {
     assert isTxnOpen();
-    return writeId++;
+    return statementId++;
   }
 
   private static long getHeartbeatInterval(Configuration conf) throws LockException {

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 24df25b..53ee9c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -301,8 +301,7 @@ class DummyTxnManager extends HiveTxnManagerImpl {
       new HiveLockObject.HiveLockObjectData(plan.getQueryId(),
                              String.valueOf(System.currentTimeMillis()),
                              "IMPLICIT",
-                             plan.getQueryStr(),
-                             conf);
+                             plan.getQueryStr());
 
     if (db != null) {
       locks.add(new HiveLockObj(new HiveLockObject(db.getName(), lockData),

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
index a514339..fff03df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.hive.common.StringInternUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.metadata.DummyPartition;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -49,23 +48,16 @@ public class HiveLockObject {
      * Note: The parameters are used to uniquely identify a HiveLockObject. 
      * The parameters will be stripped off any ':' characters in order not 
      * to interfere with the way the data is serialized (':' delimited string).
-     * The query string might be truncated depending on HIVE_LOCK_QUERY_STRING_MAX_LENGTH
-     * @param queryId The query identifier will be added to the object without change
-     * @param lockTime The lock time  will be added to the object without change
-     * @param lockMode The lock mode  will be added to the object without change
-     * @param queryStr The query string might be truncated based on
-     *     HIVE_LOCK_QUERY_STRING_MAX_LENGTH conf variable
-     * @param conf The hive configuration based on which we decide if we should truncate the query
-     *     string or not
      */
-    public HiveLockObjectData(String queryId, String lockTime, String lockMode, String queryStr,
-        HiveConf conf) {
+    public HiveLockObjectData(String queryId,
+        String lockTime,
+        String lockMode,
+        String queryStr) {
       this.queryId = removeDelimiter(queryId);
       this.lockTime = StringInternUtils.internIfNotNull(removeDelimiter(lockTime));
       this.lockMode = removeDelimiter(lockMode);
       this.queryStr = StringInternUtils.internIfNotNull(
-          queryStr == null ? null : StringUtils.substring(removeDelimiter(queryStr.trim()), 0,
-              conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_QUERY_STRING_MAX_LENGTH)));
+          removeDelimiter(queryStr == null ? null : queryStr.trim()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index b24351c..187a658 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -72,7 +72,7 @@ public interface HiveTxnManager {
 
   /**
    * Acquire all of the locks needed by a query.  If used with a query that
-   * requires transactions, this should be called after {@link #openTxn(Context, String)}.
+   * requires transactions, this should be called after {@link #openTxn(String)}.
    * A list of acquired locks will be stored in the
    * {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved
    * via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}.
@@ -208,13 +208,17 @@ public interface HiveTxnManager {
   boolean supportsAcid();
 
   /**
-   * For resources that support MVCC, the state of the DB must be recorded for the duration of the
-   * operation/transaction.  Returns {@code true} if current statment needs to do this.
+   * This behaves exactly as
+   * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)
+   */
+  void setAutoCommit(boolean autoCommit) throws LockException;
+
+  /**
+   * This behaves exactly as
+   * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#getAutoCommit()
    */
-  boolean recordSnapshot(QueryPlan queryPlan);
+  boolean getAutoCommit();
 
-  boolean isImplicitTransactionOpen();
-  
   boolean isTxnOpen();
   /**
    * if {@code isTxnOpen()}, returns the currently active transaction ID

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
index 8dbbf87..a371a5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
 abstract class HiveTxnManagerImpl implements HiveTxnManager {
 
   protected HiveConf conf;
+  private boolean isAutoCommit = true;//true by default; matches JDBC spec
 
   void setHiveConf(HiveConf c) {
     conf = c;
@@ -67,6 +68,16 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
     destruct();
   }
   @Override
+  public void setAutoCommit(boolean autoCommit) throws LockException {
+    isAutoCommit = autoCommit;
+  }
+
+  @Override
+  public boolean getAutoCommit() {
+    return isAutoCommit;
+  }
+
+  @Override
   public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
     HiveLockManager lockMgr = getAndCheckLockManager();
 
@@ -82,8 +93,7 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
         new HiveLockObjectData(lockTbl.getQueryId(),
             String.valueOf(System.currentTimeMillis()),
             "EXPLICIT",
-            lockTbl.getQueryStr(),
-            conf);
+            lockTbl.getQueryStr());
 
     if (partSpec == null) {
       HiveLock lck = lockMgr.lock(new HiveLockObject(tbl, lockData), mode, true);
@@ -141,7 +151,7 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
     HiveLockObjectData lockData =
         new HiveLockObjectData(lockDb.getQueryId(),
             String.valueOf(System.currentTimeMillis()),
-            "EXPLICIT", lockDb.getQueryStr(), conf);
+            "EXPLICIT", lockDb.getQueryStr());
 
     HiveLock lck = lockMgr.lock(new HiveLockObject(dbObj.getName(), lockData), mode, true);
     if (lck == null) {
@@ -192,13 +202,4 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
 
     return lockMgr;
   }
-  @Override
-  public boolean recordSnapshot(QueryPlan queryPlan) {
-    return false;
-  }
-  @Override
-  public boolean isImplicitTransactionOpen() {
-    return true;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
index 9b46ae7..c2a4806 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
@@ -285,10 +285,8 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
     int tryNum = 0;
     ZooKeeperHiveLock ret = null;
     Set<String> conflictingLocks = new HashSet<String>();
-    Exception lastException = null;
 
     do {
-      lastException = null;
       tryNum++;
       try {
         if (tryNum > 1) {
@@ -300,22 +298,26 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
           break;
         }
       } catch (Exception e1) {
-        lastException = e1;
         if (e1 instanceof KeeperException) {
           KeeperException e = (KeeperException) e1;
           switch (e.code()) {
           case CONNECTIONLOSS:
           case OPERATIONTIMEOUT:
-          case NONODE:
-          case NODEEXISTS:
             LOG.debug("Possibly transient ZooKeeper exception: ", e);
-            break;
+            continue;
           default:
             LOG.error("Serious Zookeeper exception: ", e);
             break;
           }
-        } else {
-          LOG.error("Other unexpected exception: ", e1);
+        }
+        if (tryNum >= numRetriesForLock) {
+          console.printError("Unable to acquire " + key.getData().getLockMode()
+              + ", " + mode + " lock " + key.getDisplayName() + " after "
+              + tryNum + " attempts.");
+          LOG.error("Exceeds maximum retries with errors: ", e1);
+          printConflictingLocks(key,mode,conflictingLocks);
+          conflictingLocks.clear();
+          throw new LockException(e1);
         }
       }
     } while (tryNum < numRetriesForLock);
@@ -325,11 +327,8 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
           + ", " + mode + " lock " + key.getDisplayName() + " after "
           + tryNum + " attempts.");
       printConflictingLocks(key,mode,conflictingLocks);
-      if (lastException != null) {
-        LOG.error("Exceeds maximum retries with errors: ", lastException);
-        throw new LockException(lastException);
-      }
     }
+    conflictingLocks.clear();
     return ret;
   }
 
@@ -351,19 +350,6 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
     }
   }
 
-  /**
-   * Creates a primitive lock object on ZooKeeper.
-   * @param key The lock data
-   * @param mode The lock mode (HiveLockMode - EXCLUSIVE/SHARED/SEMI_SHARED)
-   * @param keepAlive If true creating PERSISTENT ZooKeeper locks, otherwise EPHEMERAL ZooKeeper
-   *                  locks
-   * @param parentCreated If we expect, that the parent is already created then true, otherwise
-   *                      we will try to create the parents as well
-   * @param conflictingLocks The set where we should collect the conflicting locks when
-   *                         the logging level is set to DEBUG
-   * @return The created ZooKeeperHiveLock object, null if there was a conflicting lock
-   * @throws Exception If there was an unexpected Exception
-   */
   private ZooKeeperHiveLock lockPrimitive(HiveLockObject key,
       HiveLockMode mode, boolean keepAlive, boolean parentCreated,
       Set<String> conflictingLocks)
@@ -404,7 +390,7 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
     int seqNo = getSequenceNumber(res, getLockName(lastName, mode));
     if (seqNo == -1) {
       curatorFramework.delete().forPath(res);
-      throw new LockException("The created node does not contain a sequence number: " + res);
+      return null;
     }
 
     List<String> children = curatorFramework.getChildren().forPath(lastName);
@@ -598,6 +584,7 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
 
   /**
    * @param conf        Hive configuration
+   * @param zkpClient   The ZooKeeper client
    * @param key         The object to be compared against - if key is null, then get all locks
    **/
   private static List<HiveLock> getLocks(HiveConf conf,

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
deleted file mode 100644
index 64ce100..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.log;
-
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.hive.common.LogUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.session.OperationLog;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.core.LogEvent;
-import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.appender.RandomAccessFileAppender;
-import org.apache.logging.log4j.core.appender.routing.Route;
-import org.apache.logging.log4j.core.appender.routing.Routes;
-import org.apache.logging.log4j.core.appender.routing.RoutingAppender;
-import org.apache.logging.log4j.core.config.Configuration;
-import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.logging.log4j.core.config.Node;
-import org.apache.logging.log4j.core.config.plugins.Plugin;
-import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
-import org.apache.logging.log4j.core.config.plugins.PluginFactory;
-import org.apache.logging.log4j.core.config.plugins.processor.PluginEntry;
-import org.apache.logging.log4j.core.config.plugins.util.PluginType;
-import org.apache.logging.log4j.core.filter.AbstractFilter;
-import org.apache.logging.log4j.core.layout.PatternLayout;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-
-/**
- * Divert appender to redirect operation logs to separate files.
- */
-public class LogDivertAppender {
-  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LogDivertAppender.class.getName());
-  public static final String verboseLayout = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n";
-  public static final String nonVerboseLayout = "%-5p : %m%n";
-
-  /**
-   * A log filter that filters messages coming from the logger with the given names.
-   * It be used as a white list filter or a black list filter.
-   * We apply black list filter on the Loggers used by the log diversion stuff, so that
-   * they don't generate more logs for themselves when they process logs.
-   * White list filter is used for less verbose log collection
-   */
-  @Plugin(name = "NameFilter", category = "Core", elementType="filter", printObject = true)
-  private static class NameFilter extends AbstractFilter {
-    private Pattern namePattern;
-    private OperationLog.LoggingLevel loggingMode;
-
-    /* Patterns that are excluded in verbose logging level.
-     * Filter out messages coming from log processing classes, or we'll run an infinite loop.
-     */
-    private static final Pattern verboseExcludeNamePattern = Pattern.compile(Joiner.on("|").
-        join(new String[]{LOG.getName(), OperationLog.class.getName()}));
-
-    /* Patterns that are included in execution logging level.
-     * In execution mode, show only select logger messages.
-     */
-    private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|").
-        join(new String[]{"org.apache.hadoop.mapreduce.JobSubmitter",
-            "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(),
-            Driver.class.getName(), "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"}));
-
-    /* Patterns that are included in performance logging level.
-     * In performance mode, show execution and performance logger messages.
-     */
-    private static final Pattern performanceIncludeNamePattern = Pattern.compile(
-        executionIncludeNamePattern.pattern() + "|" + PerfLogger.class.getName());
-
-    private void setCurrentNamePattern(OperationLog.LoggingLevel mode) {
-      if (mode == OperationLog.LoggingLevel.VERBOSE) {
-        this.namePattern = verboseExcludeNamePattern;
-      } else if (mode == OperationLog.LoggingLevel.EXECUTION) {
-        this.namePattern = executionIncludeNamePattern;
-      } else if (mode == OperationLog.LoggingLevel.PERFORMANCE) {
-        this.namePattern = performanceIncludeNamePattern;
-      }
-    }
-
-    public NameFilter(OperationLog.LoggingLevel loggingMode) {
-      this.loggingMode = loggingMode;
-      setCurrentNamePattern(loggingMode);
-    }
-
-    @Override
-    public Result filter(LogEvent event) {
-      boolean excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE);
-
-      String logLevel = event.getContextMap().get(LogUtils.OPERATIONLOG_LEVEL_KEY);
-      logLevel = logLevel == null ? "" : logLevel;
-      OperationLog.LoggingLevel currentLoggingMode = OperationLog.getLoggingLevel(logLevel);
-      // If logging is disabled, deny everything.
-      if (currentLoggingMode == OperationLog.LoggingLevel.NONE) {
-        return Result.DENY;
-      }
-      // Look at the current session's setting
-      // and set the pattern and excludeMatches accordingly.
-      if (currentLoggingMode != loggingMode) {
-        loggingMode = currentLoggingMode;
-        excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE);
-        setCurrentNamePattern(loggingMode);
-      }
-
-      boolean isMatch = namePattern.matcher(event.getLoggerName()).matches();
-
-      if (excludeMatches == isMatch) {
-        // Deny if this is black-list filter (excludeMatches = true) and it
-        // matched or if this is whitelist filter and it didn't match
-        return Result.DENY;
-      }
-
-      return Result.NEUTRAL;
-    }
-
-    @PluginFactory
-    public static NameFilter createFilter(
-        @PluginAttribute("loggingLevel") final String loggingLevel) {
-      // Name required for routing. Error out if it is not set.
-      Preconditions.checkNotNull(loggingLevel,
-          "loggingLevel must be specified for " + NameFilter.class.getName());
-
-      return new NameFilter(OperationLog.getLoggingLevel(loggingLevel));
-    }
-  }
-
-  /**
-   * Programmatically register a routing appender to Log4J configuration, which
-   * automatically writes the log of each query to an individual file.
-   * The equivilent property configuration is as follows:
-   * # queryId based routing file appender
-      appender.query-routing.type = Routing
-      appender.query-routing.name = query-routing
-      appender.query-routing.routes.type = Routes
-      appender.query-routing.routes.pattern = $${ctx:queryId}
-      # default route
-      appender.query-routing.routes.route-default.type = Route
-      appender.query-routing.routes.route-default.key = $${ctx:queryId}
-      appender.query-routing.routes.route-default.app.type = null
-      appender.query-routing.routes.route-default.app.name = Null
-      # queryId based route
-      appender.query-routing.routes.route-mdc.type = Route
-      appender.query-routing.routes.route-mdc.name = IrrelevantName-query-routing
-      appender.query-routing.routes.route-mdc.app.type = RandomAccessFile
-      appender.query-routing.routes.route-mdc.app.name = query-file-appender
-      appender.query-routing.routes.route-mdc.app.fileName = ${sys:hive.log.dir}/${ctx:sessionId}/${ctx:queryId}
-      appender.query-routing.routes.route-mdc.app.layout.type = PatternLayout
-      appender.query-routing.routes.route-mdc.app.layout.pattern = %d{ISO8601} %5p %c{2}: %m%n
-   * @param conf  the configuration for HiveServer2 instance
-   */
-  public static void registerRoutingAppender(org.apache.hadoop.conf.Configuration conf) {
-    String loggingLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL);
-    OperationLog.LoggingLevel loggingMode = OperationLog.getLoggingLevel(loggingLevel);
-    String layout = loggingMode == OperationLog.LoggingLevel.VERBOSE ? verboseLayout : nonVerboseLayout;
-    String logLocation = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION);
-
-    // Create NullAppender
-    PluginEntry nullEntry = new PluginEntry();
-    nullEntry.setClassName(NullAppender.class.getName());
-    nullEntry.setKey("null");
-    nullEntry.setName("appender");
-    PluginType<NullAppender> nullChildType = new PluginType<NullAppender>(nullEntry, NullAppender.class, "appender");
-    Node nullChildNode = new Node(null, "Null", nullChildType);
-
-    // Create default route
-    PluginEntry defaultEntry = new PluginEntry();
-    defaultEntry.setClassName(Route.class.getName());
-    defaultEntry.setKey("route");
-    defaultEntry.setName("Route");
-    PluginType<Route> defaultType = new PluginType<Route>(defaultEntry, Route.class, "Route");
-    Node nullNode = new Node(null, "Route", defaultType);
-    nullNode.getChildren().add(nullChildNode);
-    Route defaultRoute = Route.createRoute(null, "${ctx:queryId}", nullNode);
-
-    // Create queryId based route
-    PluginEntry entry = new PluginEntry();
-    entry.setClassName(Route.class.getName());
-    entry.setKey("route");
-    entry.setName("Route");
-    PluginType<Route> type = new PluginType<Route>(entry, Route.class, "Route");
-    Node node = new Node(null, "Route", type);
-
-    PluginEntry childEntry = new PluginEntry();
-    childEntry.setClassName(RandomAccessFileAppender.class.getName());
-    childEntry.setKey("randomaccessfile");
-    childEntry.setName("appender");
-    PluginType<RandomAccessFileAppender> childType = new PluginType<RandomAccessFileAppender>(childEntry, RandomAccessFileAppender.class, "appender");
-    Node childNode = new Node(node, "RandomAccessFile", childType);
-    childNode.getAttributes().put("name", "query-file-appender");
-    childNode.getAttributes().put("fileName", logLocation + "/${ctx:sessionId}/${ctx:queryId}");
-    node.getChildren().add(childNode);
-
-    PluginEntry filterEntry = new PluginEntry();
-    filterEntry.setClassName(NameFilter.class.getName());
-    filterEntry.setKey("namefilter");
-    filterEntry.setName("namefilter");
-    PluginType<NameFilter> filterType = new PluginType<NameFilter>(filterEntry, NameFilter.class, "filter");
-    Node filterNode = new Node(childNode, "NameFilter", filterType);
-    filterNode.getAttributes().put("loggingLevel", loggingMode.name());
-    childNode.getChildren().add(filterNode);
-
-    PluginEntry layoutEntry = new PluginEntry();
-    layoutEntry.setClassName(PatternLayout.class.getName());
-    layoutEntry.setKey("patternlayout");
-    layoutEntry.setName("layout");
-    PluginType<PatternLayout> layoutType = new PluginType<PatternLayout>(layoutEntry, PatternLayout.class, "layout");
-    Node layoutNode = new Node(childNode, "PatternLayout", layoutType);
-    layoutNode.getAttributes().put("pattern", layout);
-    childNode.getChildren().add(layoutNode);
-
-    Route mdcRoute = Route.createRoute(null, null, node);
-    Routes routes = Routes.createRoutes("${ctx:queryId}", defaultRoute, mdcRoute);
-
-    LoggerContext context = (LoggerContext) LogManager.getContext(false);
-    Configuration configuration = context.getConfiguration();
-
-    RoutingAppender routingAppender = RoutingAppender.createAppender("query-routing",
-        "true",
-        routes,
-        configuration,
-        null,
-        null,
-        null);
-
-    LoggerConfig loggerConfig = configuration.getRootLogger();
-    loggerConfig.addAppender(routingAppender, null, null);
-    context.updateLoggers();
-    routingAppender.start();
-  }
-}